In December, we had a support ticket in which a customer complained that intermittently their tests were running for a long time, and then losing the output at the end, which made it frustratingly hard to debug.

This functionality relies on at least two components, we have a service that captures streamed output and a binary that executes in-dyno and runs tests and streams the test output to the streaming service.

Initially, I looked into the streamed output service, it stores the output in Redis temporarily, and when the stream is closed, stores that more permanently in S3.

The output expires from Redis (to keep memory consumption to a miminum), and the basic issue is that the binary wasn’t signalling that the stream should be closed, I looked at options for watching for the expiry and storing the output and the Redis events stream would mean that I could watch for these, but there’s no way to prevent keys being deleted, and I receive the delete-notification after the fact.

It would be possible to have two keys, and track the first key expiring to warn that the second stream was due to expire, this could work, but it wouldn’t solve the problem that the end of the streamed output was being lost.

So, back to the test-runner, and it turned out that it just wasn’t handling SIGTERM.

A simplified version of the code looks like this…

func main() {
	defer func() {
		log.Println("uploading output")
	}()
	ctx, cancel := context.WithCancel(context.Background())
	cancelOnTermination(cancel)
	cmd := exec.CommandContext(ctx, "bash", "-c", "watch date &")
	var outb, errb bytes.Buffer
	log.Printf("about to run - process has PID %d\n", os.Getpid())
	cmd.Stdout = &outb
	cmd.Stderr = &errb
	if err := cmd.Run(); err != nil {
		log.Printf("err was not nil: %v", err)
	}
	log.Println("Process terminated normally")
}

func cancelOnTermination(cancel context.CancelFunc) {
	log.Println("setting up a signal handler")
	s := make(chan os.Signal, 1)
	signal.Notify(s, syscall.SIGTERM)
	go func() {
		log.Printf("received SIGTERM %v\n", <-s)
		cancel()
	}()
}

This relies on the defer to upload the output from running the test.

This has no signal handling, so a SIGTERM results in the immediate termination of the process, and with that, no opportunity to unwind the defer and upload the output.

$ go run runner.go

and in a separate terminal

$ kill -TERM <pid>
2019/02/20 07:17:31 about to run - process has PID 83638
signal: terminated

It’s clear that running process is not executing the defer…

First attempt at a fix

func main() {
	defer func() {
		log.Println("executing the defer")
	}()
	ctx := cancelOnTermination(context.WithCancel(context.Background()))
	cmd := exec.CommandContext(ctx, "bash", "-c", "watch date &")
        cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
	var outb, errb bytes.Buffer
	log.Printf("about to run - process has PID %d\n", os.Getpid())
	cmd.Stdout = &outb
	cmd.Stderr = &errb
	if err := cmd.Run(); err != nil {
		log.Printf("err was not nil: %v", err)
	}
	log.Println("Process terminated normally %s - %s", outb.String(), errb.String())
}

func cancelOnTermination(ctx context.Context, cancel context.CancelFunc) context.Context {
	log.Println("setting up a signal handler")
	s := make(chan os.Signal, 1)
	signal.Notify(s, syscall.SIGTERM)
	go func() {
		log.Printf("received SIGTERM %v\n", <-s)
		cancel()
	}()
	return ctx
}

This adds a bit of additional complexity, there’s a signal handler for SIGTERM that triggers the cancellation of the context, this sends a SIGKILL to the child process.

This is pretty much the canonical use for exec.CommandContext() terminating when a command times out.

$ go run runner.go
2019/02/20 07:36:00 setting up a signal handler
2019/02/20 07:36:00 about to run - process has PID 84998
2019/02/20 07:36:06 received SIGTERM terminated

And at this, the process hangs…again, the defer doesn’t work, because the process hasn’t yet terminated, and this means no upload.

We can ^C the process and it will die, and no upload.

Second attempt at a fix

func main() {
	defer func() {
		log.Println("executing the defer")
	}()
	s := cancelOnTermination()
	cmd := exec.Command("bash", "-c", "watch date &")
	go func() {
		<-s
		syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
	}()
	var outb, errb bytes.Buffer
	log.Printf("about to run - process has PID %d\n", os.Getpid())
	cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
	cmd.Stdout = &outb
	cmd.Stderr = &errb
	if err := cmd.Run(); err != nil {
		log.Printf("err was not nil: %v", err)
	}
	log.Println("Process terminated normally")
}

func cancelOnTermination() chan os.Signal {
	log.Println("setting up a signal handler")
	s := make(chan os.Signal, 1)
	signal.Notify(s, syscall.SIGTERM)
	return s
}

Again, some additional complexity:

  • It puts the child process into a separate Process Group
  • the context is gone, as it doesn’t really do what I need, and instead, I can use the signal channel, and a separate Goroutine that waits on a signal and SIGKILLS the child processes.

This needs to place the executed command into a new Progress Group, because if I don’t, I’d kill the test-running binary (child processes inherit their parent’s process group).

$ go run runner.go
2019/02/20 08:05:22 setting up a signal handler
2019/02/20 08:05:22 about to run - process has PID 87370
2019/02/20 08:05:28 Process terminated normally
2019/02/20 08:05:28 executing the defer

This works, I can see that it terminates, and the defer fires.

A test-run spawns multiple processes during execution, and the main process could be terminated at any point, this leads to another issue.

Third version

func executeCommand(c string, s chan os.Signal) error {
	log.Printf("executing command %s\n", c)
	cmd := exec.Command("bash", "-c", c)
	go func() {
		<-s
        log.Println("killing a process")
		syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
	}()
	var outb, errb bytes.Buffer
	cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
	cmd.Stdout = &outb
	cmd.Stderr = &errb
	if err := cmd.Run(); err != nil {
		log.Printf("err was not nil: %v", err)
		return err
	}
	return nil
}

func main() {
	defer func() {
		log.Println("executing the defer")
	}()
	s := cancelOnTermination()
	log.Printf("about to run - process has PID %d\n", os.Getpid())
	executeCommand("ls /var", s)
	executeCommand("watch date &", s)

	log.Println("Process terminated normally")
}

func cancelOnTermination() chan os.Signal {
	log.Println("setting up a signal handler")
	s := make(chan os.Signal, 1)
	signal.Notify(s, syscall.SIGTERM)
	return s
}
2019/02/20 08:15:53 setting up a signal handler
2019/02/20 08:15:53 about to run - process has PID 89770
2019/02/20 08:15:53 executing command ls /var
2019/02/20 08:15:53 executing command watch date &
2019/02/20 08:15:56 killing a process

SIGTERMing the process doesn’t appear to be doing much, the app hangs after logging out that it’s killed a process.

If I send SIGTERM twice, it seems to kill it again, and this appears to work?

2019/02/20 08:16:14 killing a process
2019/02/20 08:16:14 Process terminated normally
2019/02/20 08:16:14 executing the defer

What’s going on?

I’ve spawned two Goroutines to listen on the channel (one per command), and values sent to a channel are distributed “round-robin” to each of the Goroutines, so the first time, we trigger the killing of the first process (it’s already gone by then), there’s nothing to terminate the first Goroutine, and it hangs around to receive notifications for processes that are already complete.

Unfortunately, the dyno-runner doesn’t send SIGTERM more than once, so this means I need a better solution.

Decorating os/exec/Cmd

I created a simple Struct to monitor Cmds.


// NewProcessGroupCmd creates a new ProcessGroupCmd to monitor a signal channel.
func NewProcessGroupCmd(c chan os.Signal, cmd *exec.Cmd) *ProcessGroupCmd {
	return &ProcessGroupCmd{
		sigChan: c,
		Cmd:     cmd,
		done:    make(chan bool, 1),
	}
}

// ProcessGroupCmd wraps an exec.Cmd and a channel.
type ProcessGroupCmd struct {
	sigChan chan os.Signal
	done    chan bool
	*exec.Cmd
}

// Run performs the equivalent of running the underlying Cmd.
//
// When spawning the command, it will be put into its own OS process proup.
//
// If a value is received on the signal channel, the command _and_ all child
// processes will be killed (SIGKILL) by signalling the command's process
// group id.
func (grp *ProcessGroupCmd) Run() error {
	grp.Cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
	if err := grp.Cmd.Start(); err != nil {
		return err
	}
	defer func() {
		close(grp.done)
	}()
	go func() {
		select {
		case <-grp.sigChan:
			syscall.Kill(-grp.Cmd.Process.Pid, syscall.SIGKILL)
			return
		case <-grp.done:
			return
		}
	}()
	return grp.Cmd.Wait()
}

How does it work?

	grp.Cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}

This changes the process attributes to set the Process Group ID to the PID of the newly spawned process, this puts the newly spawned command (“bash”) into a separate process group, the child processes of this (“watch date &”), will inherit this group id.

I need to call Start(), to begin executing the child process (and have it take on the new group).

	defer func() {
		close(grp.done)
	}()

The ProcessGroupCmd creates a new channel for signalling that the process has closed successfully, that way I can let the monitoring Goroutine know that it doesn’t need to worry about killing the process, this stops it listening on the signal channel.

	go func() {
		select {
		case <-grp.sigChan:
			syscall.Kill(-grp.Cmd.Process.Pid, syscall.SIGKILL)
			return
		case <-grp.done:
			return
		}
	}()

This is the magic, it waits for one of two things to happen, a signal to indicate that we’ve received a SIGTERM, and so the process should kill the child processes, or alternatively, a notification on the grp.done channel (which happens when the defer closes the channel) in which case, the Goroutine exits.

This means that each command executed relinquishes its hold on the signal channel when it exits.

	return grp.Cmd.Wait()

Finally, this waits for the underlying Cmd to execute and return whatever it returns (a possible error)

func executeCommand(c string, s chan os.Signal) error {
	log.Printf("executing command %s\n", c)
	cmd := exec.Command("bash", "-c", c)
	var outb, errb bytes.Buffer
	cmd.Stdout = &outb
	cmd.Stderr = &errb
	monitored := NewProcessGroupCmd(s, cmd)

	if err := monitored.Run(); err != nil {
		log.Printf("err was not nil: %v", err)
		return err
	}
	return nil
}

This approach allows configuration of the Cmd (setting the output streams etc), and just changes the behaviour of the Run method, and requires minimal changes to callers.

Some tests ensure that it does what I want…

func TestKillStuckProcessEndsCommand(t *testing.T) {
	sigChan := make(chan os.Signal)
	// This spawns a child process (bash) which spawns another process which
	// goes into the background.
	cmd := createCommand("watch date &")
	grp := NewProcessGroupCmd(sigChan, cmd)
	go func() {
		time.Sleep(time.Millisecond * 500)
		sigChan <- syscall.Signal(syscall.SIGTERM)
	}()

	err := grp.Run()

	if err != nil {
		t.Fatal(err)
	}
}

func TestKillStuckProcessWithMultipleCommands(t *testing.T) {
	sigChan := make(chan os.Signal)
	grp1 := NewProcessGroupCmd(sigChan, createCommand("ls /var/tmp"))
	err := grp1.Run()
	if err != nil {
		t.Fatal(err)
	}

	go func() {
		time.Sleep(time.Millisecond * 500)
		sigChan <- syscall.Signal(syscall.SIGTERM)
	}()

	grp2 := NewProcessGroupCmd(sigChan, createCommand("watch date &"))
	err = grp2.Run()
	if err != nil {
		t.Fatal(err)
	}
}

func createCommand(s string) *exec.Cmd {
	// This spawns a child process (bash) which spawns another process which
	// goes into the background.
	cmd := exec.CommandContext(
		context.Background(),
		"bash", "-c", s,
	)
	var stdOut, stdErr bytes.Buffer
	cmd.Stdout, cmd.Stderr = &stdOut, &stdErr
	return cmd
}