| package supervisor |
| |
| import ( |
| "sync" |
| "syscall" |
| |
| "github.com/Sirupsen/logrus" |
| "github.com/docker/containerd/runtime" |
| ) |
| |
| func NewMonitor() (*Monitor, error) { |
| m := &Monitor{ |
| receivers: make(map[int]interface{}), |
| exits: make(chan runtime.Process, 1024), |
| ooms: make(chan string, 1024), |
| } |
| fd, err := syscall.EpollCreate1(0) |
| if err != nil { |
| return nil, err |
| } |
| m.epollFd = fd |
| go m.start() |
| return m, nil |
| } |
| |
| type Monitor struct { |
| m sync.Mutex |
| receivers map[int]interface{} |
| exits chan runtime.Process |
| ooms chan string |
| epollFd int |
| } |
| |
| func (m *Monitor) Exits() chan runtime.Process { |
| return m.exits |
| } |
| |
| func (m *Monitor) OOMs() chan string { |
| return m.ooms |
| } |
| |
| func (m *Monitor) Monitor(p runtime.Process) error { |
| m.m.Lock() |
| defer m.m.Unlock() |
| fd := p.ExitFD() |
| event := syscall.EpollEvent{ |
| Fd: int32(fd), |
| Events: syscall.EPOLLHUP, |
| } |
| if err := syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil { |
| return err |
| } |
| EpollFdCounter.Inc(1) |
| m.receivers[fd] = p |
| return nil |
| } |
| |
| func (m *Monitor) MonitorOOM(c runtime.Container) error { |
| m.m.Lock() |
| defer m.m.Unlock() |
| o, err := c.OOM() |
| if err != nil { |
| return err |
| } |
| fd := o.FD() |
| event := syscall.EpollEvent{ |
| Fd: int32(fd), |
| Events: syscall.EPOLLHUP | syscall.EPOLLIN, |
| } |
| if err := syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil { |
| return err |
| } |
| EpollFdCounter.Inc(1) |
| m.receivers[fd] = o |
| return nil |
| } |
| |
| func (m *Monitor) Close() error { |
| return syscall.Close(m.epollFd) |
| } |
| |
| func (m *Monitor) start() { |
| var events [128]syscall.EpollEvent |
| for { |
| n, err := syscall.EpollWait(m.epollFd, events[:], -1) |
| if err != nil { |
| if err == syscall.EINTR { |
| continue |
| } |
| logrus.WithField("error", err).Fatal("containerd: epoll wait") |
| } |
| // process events |
| for i := 0; i < n; i++ { |
| fd := int(events[i].Fd) |
| m.m.Lock() |
| r := m.receivers[fd] |
| switch t := r.(type) { |
| case runtime.Process: |
| if events[i].Events == syscall.EPOLLHUP { |
| delete(m.receivers, fd) |
| if err = syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{ |
| Events: syscall.EPOLLHUP, |
| Fd: int32(fd), |
| }); err != nil { |
| logrus.WithField("error", err).Error("containerd: epoll remove fd") |
| } |
| if err := t.Close(); err != nil { |
| logrus.WithField("error", err).Error("containerd: close process IO") |
| } |
| EpollFdCounter.Dec(1) |
| m.exits <- t |
| } |
| case runtime.OOM: |
| // always flush the event fd |
| t.Flush() |
| if t.Removed() { |
| delete(m.receivers, fd) |
| // epoll will remove the fd from its set after it has been closed |
| t.Close() |
| EpollFdCounter.Dec(1) |
| } else { |
| m.ooms <- t.ContainerID() |
| } |
| } |
| m.m.Unlock() |
| } |
| } |
| } |