diff --git a/README.md b/README.md new file mode 100644 index 0000000..6dec2cd --- /dev/null +++ b/README.md @@ -0,0 +1,35 @@ +# mediawatcher + +`mediawatcher` is a small Go daemon that: + +1. Watches one or more download directories +2. Waits for files to finish writing +3. Classifies them as TV / Movie / Misc / Unknown +4. Moves them into a structured incoming tree +5. Optionally rsyncs them to remote media servers +6. Optionally notifies Sonarr/Radarr (or any HTTP endpoint) to rescan + +## Build + +```bash +cd mediawatcher +go build ./cmd/mediawatcher +``` + +## Config + +Copy the example config and edit: + +```bash +cp mediawatcher.example.yml mediawatcher.yml +``` + +Edit `watch.dirs`, `structure.*`, `sync.targets`, and `notifier.endpoints`. + +## Run + +```bash +./mediawatcher -config=mediawatcher.yml +``` + +Or as a systemd service (see `systemd/mediawatcher.service`). diff --git a/cmd/mediawatcher/main.go b/cmd/mediawatcher/main.go index e69de29..a427d60 100644 --- a/cmd/mediawatcher/main.go +++ b/cmd/mediawatcher/main.go @@ -0,0 +1,71 @@ +package main + +import ( + "flag" + "log" + "os" + "os/signal" + "syscall" + "time" + + "mediawatcher/internal/classifier" + "mediawatcher/internal/config" + "mediawatcher/internal/mover" + "mediawatcher/internal/notifier" + "mediawatcher/internal/syncer" + "mediawatcher/internal/watcher" +) + +func main() { + cfgPath := flag.String("config", "mediawatcher.yml", "path to config file") + flag.Parse() + + cfg, err := config.Load(*cfgPath) + if err != nil { + log.Fatalf("failed to load config: %v", err) + } + + w, err := watcher.New(cfg) + if err != nil { + log.Fatalf("failed to create watcher: %v", err) + } + defer w.Close() + + sync := syncer.New(&cfg.Sync) + notify := notifier.New(&cfg.Notifier) + + handler := func(path string) { + log.Printf("[main] detected stable file: %s", path) + + res := classifier.Classify(path, cfg) + log.Printf("[main] classified %s as %s", path, res.Type) + + destPath, err := mover.Move(path, res.DestDir, res.DestName) + if err != nil { + log.Printf("[main] move error: %v", err) + return + } + res.SourcePath = destPath + + if sync != nil { + sync.Sync(res) + } + if notify != nil { + notify.Notify(res) + } + } + + if err := w.Start(handler); err != nil { + log.Fatalf("failed to start watcher: %v", err) + } + + log.Println("mediawatcher started") + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + sig := <-sigCh + log.Printf("received signal %s, shutting down", sig) + + time.Sleep(1 * time.Second) +} diff --git a/go.mod b/go.mod index 062908f..332ad5d 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,10 @@ -module belmontpt.org/m +module mediawatcher -go 1.25.1 +go 1.22 + +require ( + github.com/fsnotify/fsnotify v1.7.0 + gopkg.in/yaml.v3 v3.0.1 +) + +require golang.org/x/sys v0.4.0 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..37fe95d --- /dev/null +++ b/go.sum @@ -0,0 +1,8 @@ +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/classifier/classifier.go b/internal/classifier/classifier.go index 0b09c57..5343696 100644 --- a/internal/classifier/classifier.go +++ b/internal/classifier/classifier.go @@ -1,27 +1,112 @@ package classifier import ( + "fmt" "path/filepath" "regexp" + "strconv" "strings" + + "mediawatcher/internal/config" ) -var movieYear = regexp.MustCompile(`(?i)(19|20)\d{2}`) +type Type string -func classifyMovie(filename string) (bool, string, int) { - base := strings.TrimSuffix(filepath.Base(filename), filepath.Ext(filename)) +const ( + TypeTV Type = "tv" + TypeMovie Type = "movie" + TypeMisc Type = "misc" + TypeUnknown Type = "unknown" +) - // Find year - yearMatch := movieYear.FindString(base) - if yearMatch == "" { - return false, "", 0 +type Result struct { + Type Type + Title string + Season int + Episode int + Year int + DestDir string + DestName string + SourcePath string +} + +// regexes +var ( + tvPattern = regexp.MustCompile(`(?i)(S?(\d{1,2}))[ ._-]*[Ex](\d{1,3})`) + yearPattern = regexp.MustCompile(`\b(19\d{2}|20[0-3]\d)\b`) +) + +func Classify(path string, cfg *config.Config) Result { + base := filepath.Base(path) + nameNoExt := strings.TrimSuffix(base, filepath.Ext(base)) + + // TV first + if m := tvPattern.FindStringSubmatch(nameNoExt); len(m) == 4 { + season := atoiSafe(m[2]) + episode := atoiSafe(m[3]) + + cleaned := tvPattern.ReplaceAllString(nameNoExt, "") + title := cleanupTitle(cleaned) + + destDir := filepath.Join(cfg.Structure.TVDir, title, fmt.Sprintf("Season %02d", season)) + + return Result{ + Type: TypeTV, + Title: title, + Season: season, + Episode: episode, + DestDir: destDir, + DestName: base, + SourcePath: path, + } } - year := parseInt(yearMatch) + // Movie + if ym := yearPattern.FindString(nameNoExt); ym != "" { + year := atoiSafe(ym) + cleaned := strings.Replace(nameNoExt, ym, "", 1) + title := cleanupTitle(cleaned) + destDir := filepath.Join(cfg.Structure.MoviesDir, fmt.Sprintf("%s (%d)", title, year)) - // Extract title (everything before year) - title := strings.TrimSpace(strings.Replace(base, yearMatch, "", 1)) - title = cleanupTitle(title) + return Result{ + Type: TypeMovie, + Title: title, + Year: year, + DestDir: destDir, + DestName: base, + SourcePath: path, + } + } - return true, title, year + // Misc based on extension + ext := strings.ToLower(filepath.Ext(base)) + if ext == ".mp3" || ext == ".flac" || ext == ".m4a" || ext == ".aac" { + return Result{ + Type: TypeMisc, + DestDir: cfg.Structure.MiscDir, + DestName: base, + SourcePath: path, + } + } + + // Unknown + return Result{ + Type: TypeUnknown, + DestDir: cfg.Structure.UnknownDir, + DestName: base, + SourcePath: path, + } +} + +func cleanupTitle(s string) string { + s = strings.ReplaceAll(s, ".", " ") + s = strings.ReplaceAll(s, "_", " ") + s = strings.ReplaceAll(s, "-", " ") + s = strings.Join(strings.Fields(s), " ") + return strings.TrimSpace(s) +} + +func atoiSafe(s string) int { + n, _ := strconv.Atoi(s) + return n } diff --git a/internal/config/config.go b/internal/config/config.go index d0eb57f..b23241a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,10 +1,19 @@ package config import ( + "fmt" "os" + "gopkg.in/yaml.v3" ) +type WatchConfig struct { + Dirs []string `yaml:"dirs"` + StableSeconds int `yaml:"stable_seconds"` + IncludeExt []string `yaml:"include_ext"` + ExcludeExt []string `yaml:"exclude_ext"` +} + type StructureConfig struct { MoviesDir string `yaml:"movies_dir"` TVDir string `yaml:"tv_dir"` @@ -13,24 +22,75 @@ type StructureConfig struct { AutoCreate bool `yaml:"auto_create"` } -type WatchConfig struct { - Dirs []string `yaml:"dirs"` +type RsyncTarget struct { + Host string `yaml:"host"` + Port int `yaml:"port"` + User string `yaml:"user"` + DestBase string `yaml:"dest_base"` + ExtraArgs []string `yaml:"extra_args"` +} + +type SyncConfig struct { + Enabled bool `yaml:"enabled"` + RsyncBinary string `yaml:"rsync_binary"` + Targets []RsyncTarget `yaml:"targets"` +} + +type NotifierEndpoint struct { + Name string `yaml:"name"` + Method string `yaml:"method"` + URL string `yaml:"url"` + Headers map[string]string `yaml:"headers"` + Body string `yaml:"body"` +} + +type NotifierConfig struct { + Enabled bool `yaml:"enabled"` + Endpoints []NotifierEndpoint `yaml:"endpoints"` +} + +type LoggingConfig struct { + Level string `yaml:"level"` } type Config struct { Watch WatchConfig `yaml:"watch"` Structure StructureConfig `yaml:"structure"` + Sync SyncConfig `yaml:"sync"` + Notifier NotifierConfig `yaml:"notifier"` + Logging LoggingConfig `yaml:"logging"` } -func LoadConfig(path string) (*Config, error) { +func Load(path string) (*Config, error) { data, err := os.ReadFile(path) if err != nil { - return nil, err + return nil, fmt.Errorf("read config: %w", err) } var cfg Config if err := yaml.Unmarshal(data, &cfg); err != nil { - return nil, err + return nil, fmt.Errorf("parse yaml: %w", err) + } + + if cfg.Watch.StableSeconds <= 0 { + cfg.Watch.StableSeconds = 15 + } + + if cfg.Sync.RsyncBinary == "" { + cfg.Sync.RsyncBinary = "/usr/bin/rsync" + } + + if cfg.Structure.MoviesDir == "" { + cfg.Structure.MoviesDir = "/srv/media/incoming/movies" + } + if cfg.Structure.TVDir == "" { + cfg.Structure.TVDir = "/srv/media/incoming/tv" + } + if cfg.Structure.MiscDir == "" { + cfg.Structure.MiscDir = "/srv/media/incoming/misc" + } + if cfg.Structure.UnknownDir == "" { + cfg.Structure.UnknownDir = "/srv/media/incoming/unknown" } return &cfg, nil diff --git a/internal/mover/mover.go b/internal/mover/mover.go index e69de29..e28cef5 100644 --- a/internal/mover/mover.go +++ b/internal/mover/mover.go @@ -0,0 +1,19 @@ +package mover + +import ( + "fmt" + "os" + "path/filepath" +) + +// Move moves the file to destDir/destName, creating directories as needed. +func Move(src, destDir, destName string) (string, error) { + if err := os.MkdirAll(destDir, 0o755); err != nil { + return "", fmt.Errorf("mkdir %s: %w", destDir, err) + } + destPath := filepath.Join(destDir, destName) + if err := os.Rename(src, destPath); err != nil { + return "", fmt.Errorf("rename %s -> %s: %w", src, destPath, err) + } + return destPath, nil +} diff --git a/internal/notifier/notifier.go b/internal/notifier/notifier.go new file mode 100644 index 0000000..56df328 --- /dev/null +++ b/internal/notifier/notifier.go @@ -0,0 +1,70 @@ +package notifier + +import ( + "bytes" + "log" + "net/http" + "time" + + "mediawatcher/internal/classifier" + "mediawatcher/internal/config" +) + +type Notifier struct { + cfg *config.NotifierConfig + cli *http.Client +} + +func New(cfg *config.NotifierConfig) *Notifier { + if cfg == nil || !cfg.Enabled { + return nil + } + return &Notifier{ + cfg: cfg, + cli: &http.Client{Timeout: 10 * time.Second}, + } +} + +func (n *Notifier) Notify(res classifier.Result) { + if n == nil || !n.cfg.Enabled { + return + } + for _, ep := range n.cfg.Endpoints { + if err := n.callEndpoint(ep, res); err != nil { + log.Printf("[notify] %s error: %v", ep.Name, err) + } + } +} + +func (n *Notifier) callEndpoint(ep config.NotifierEndpoint, res classifier.Result) error { + if ep.URL == "" { + return nil + } + method := ep.Method + if method == "" { + method = http.MethodPost + } + + body := []byte(ep.Body) + req, err := http.NewRequest(method, ep.URL, bytes.NewReader(body)) + if err != nil { + return err + } + for k, v := range ep.Headers { + req.Header.Set(k, v) + } + if req.Header.Get("Content-Type") == "" && len(body) > 0 { + req.Header.Set("Content-Type", "application/json") + } + + resp, err := n.cli.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode >= 300 { + log.Printf("[notify] %s returned status %d", ep.Name, resp.StatusCode) + } + return nil +} diff --git a/internal/syncer/syncer.go b/internal/syncer/syncer.go new file mode 100644 index 0000000..54d8935 --- /dev/null +++ b/internal/syncer/syncer.go @@ -0,0 +1,81 @@ +package syncer + +import ( + "bytes" + "fmt" + "log" + "os/exec" + "path/filepath" + + "mediawatcher/internal/classifier" + "mediawatcher/internal/config" +) + +type Syncer struct { + cfg *config.SyncConfig +} + +func New(cfg *config.SyncConfig) *Syncer { + if cfg == nil || !cfg.Enabled { + return nil + } + return &Syncer{cfg: cfg} +} + +func (s *Syncer) Sync(res classifier.Result) { + if s == nil || !s.cfg.Enabled { + return + } + + for _, t := range s.cfg.Targets { + if err := s.syncToTarget(t, res); err != nil { + log.Printf("[sync] error syncing to %s: %v", t.Host, err) + } + } +} + +func (s *Syncer) syncToTarget(t config.RsyncTarget, res classifier.Result) error { + if t.Host == "" || t.DestBase == "" { + return fmt.Errorf("invalid target config: host and dest_base required") + } + + user := t.User + if user == "" { + user = "media" + } + port := t.Port + if port == 0 { + port = 22 + } + + // Recreate relative dest under DestBase + rel, err := filepath.Rel(res.DestDir, filepath.Join(res.DestDir, res.DestName)) + if err != nil { + rel = res.DestName + } + + remoteDir := t.DestBase + remote := fmt.Sprintf("%s@%s:%s/", user, t.Host, remoteDir) + + args := []string{"-avh"} + if port != 22 { + args = append(args, "-e", fmt.Sprintf("ssh -p %d", port)) + } + args = append(args, t.ExtraArgs...) + args = append(args, filepath.Join(res.DestDir, res.DestName), remote) + + log.Printf("[sync] rsync %v", args) + + cmd := exec.Command(s.cfg.RsyncBinary, args...) + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + log.Printf("[sync] stdout: %s", stdout.String()) + log.Printf("[sync] stderr: %s", stderr.String()) + return fmt.Errorf("rsync failed: %w", err) + } + + return nil +} diff --git a/internal/util/fileutil.go b/internal/util/fileutil.go new file mode 100644 index 0000000..34ba786 --- /dev/null +++ b/internal/util/fileutil.go @@ -0,0 +1,60 @@ +package util + +import ( + "os" + "path/filepath" + "strings" + "time" +) + +// HasExt reports whether path has one of the provided extensions (case-insensitive, with or without dot). +func HasExt(path string, exts []string) bool { + if len(exts) == 0 { + return true + } + ext := strings.ToLower(filepath.Ext(path)) + for _, e := range exts { + e = strings.ToLower(e) + if !strings.HasPrefix(e, ".") { + e = "." + e + } + if ext == e { + return true + } + } + return false +} + +// WaitForStable waits until the file size has not changed for stableFor duration. +func WaitForStable(path string, stableFor time.Duration, timeout time.Duration) error { + start := time.Now() + var lastSize int64 = -1 + var stableStart time.Time + + for { + info, err := os.Stat(path) + if err != nil { + return err + } + size := info.Size() + + if size == lastSize { + if stableStart.IsZero() { + stableStart = time.Now() + } + if time.Since(stableStart) >= stableFor { + return nil + } + } else { + stableStart = time.Time{} + } + + lastSize = size + + if time.Since(start) > timeout { + return os.ErrDeadlineExceeded + } + + time.Sleep(2 * time.Second) + } +} diff --git a/internal/watcher/watcher.go b/internal/watcher/watcher.go index 280da3c..768c326 100644 --- a/internal/watcher/watcher.go +++ b/internal/watcher/watcher.go @@ -2,33 +2,125 @@ package watcher import ( "log" + "os" + "path/filepath" + "sync" + "time" + "github.com/fsnotify/fsnotify" + + "mediawatcher/internal/config" + "mediawatcher/internal/util" ) type FileHandler func(path string) -func WatchDirs(dirs []string, handler FileHandler) error { - watcher, err := fsnotify.NewWatcher() - if err != nil { - return err - } - defer watcher.Close() +type Watcher struct { + cfg *config.Config + watcher *fsnotify.Watcher + pending map[string]struct{} + mu sync.Mutex +} - for _, d := range dirs { - if err := watcher.Add(d); err != nil { +func New(cfg *config.Config) (*Watcher, error) { + w, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + return &Watcher{ + cfg: cfg, + watcher: w, + pending: make(map[string]struct{}), + }, nil +} + +func (w *Watcher) Close() error { + return w.watcher.Close() +} + +func (w *Watcher) Start(handler FileHandler) error { + for _, dir := range w.cfg.Watch.Dirs { + if err := w.addDir(dir); err != nil { return err } } + go w.loop(handler) + return nil +} + +func (w *Watcher) addDir(dir string) error { + info, err := os.Stat(dir) + if err != nil { + return err + } + if !info.IsDir() { + return nil + } + + log.Printf("[watch] watching %s", dir) + return w.watcher.Add(dir) +} + +func (w *Watcher) loop(handler FileHandler) { for { select { - case event := <-watcher.Events: - if event.Op&(fsnotify.Create|fsnotify.Write) != 0 { - handler(event.Name) + case event, ok := <-w.watcher.Events: + if !ok { + return + } + if event.Op&(fsnotify.Create|fsnotify.Write|fsnotify.Rename) == 0 { + continue } - case err := <-watcher.Errors: - log.Printf("Watcher error: %v\n", err) + path := event.Name + info, err := os.Stat(path) + if err != nil || info.IsDir() { + continue + } + + extIncl := util.HasExt(path, w.cfg.Watch.IncludeExt) + extExcl := util.HasExt(path, w.cfg.Watch.ExcludeExt) + if !extIncl || extExcl { + continue + } + + w.mu.Lock() + if _, exists := w.pending[path]; exists { + w.mu.Unlock() + continue + } + w.pending[path] = struct{}{} + w.mu.Unlock() + + go w.handleFile(path, handler) + + case err, ok := <-w.watcher.Errors: + if !ok { + return + } + log.Printf("[watch] error: %v", err) } } } + +func (w *Watcher) handleFile(path string, handler FileHandler) { + defer func() { + w.mu.Lock() + delete(w.pending, path) + w.mu.Unlock() + }() + + stableFor := time.Duration(w.cfg.Watch.StableSeconds) * time.Second + if err := util.WaitForStable(path, stableFor, 2*time.Hour); err != nil { + log.Printf("[watch] file %s not stable: %v", path, err) + return + } + + abs, err := filepath.Abs(path) + if err != nil { + abs = path + } + + handler(abs) +} diff --git a/mediawatcher.example.yml b/mediawatcher.example.yml new file mode 100644 index 0000000..e11c155 --- /dev/null +++ b/mediawatcher.example.yml @@ -0,0 +1,52 @@ +watch: + dirs: + - "/srv/downloads/incoming" + stable_seconds: 15 + include_ext: + - ".mkv" + - ".mp4" + - ".avi" + - ".mp3" + - ".flac" + exclude_ext: + - ".part" + - ".tmp" + - ".crdownload" + +structure: + movies_dir: "/srv/media/incoming/movies" + tv_dir: "/srv/media/incoming/tv" + misc_dir: "/srv/media/incoming/misc" + unknown_dir: "/srv/media/incoming/unknown" + auto_create: true + +sync: + enabled: true + rsync_binary: "/usr/bin/rsync" + targets: + - host: "200:ygg:media::1" + port: 22 + user: "media" + dest_base: "/srv/media/incoming" + extra_args: + - "--partial" + - "--inplace" + +notifier: + enabled: true + endpoints: + - name: "sonarr" + method: "POST" + url: "http://sonarr.local:8989/api/v3/command" + headers: + X-Api-Key: "YOUR_SONARR_API_KEY" + body: '{"name":"RescanFolders"}' + - name: "radarr" + method: "POST" + url: "http://radarr.local:7878/api/v3/command" + headers: + X-Api-Key: "YOUR_RADARR_API_KEY" + body: '{"name":"RescanFolders"}' + +logging: + level: "info" diff --git a/scripts/build-all.sh b/scripts/build-all.sh new file mode 100755 index 0000000..1ed25f9 --- /dev/null +++ b/scripts/build-all.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd) +cd "$ROOT_DIR" + +echo "Building linux/amd64..." +GOOS=linux GOARCH=amd64 go build -o bin/mediawatcher-linux-amd64 ./cmd/mediawatcher + +echo "Building linux/arm64..." +GOOS=linux GOARCH=arm64 go build -o bin/mediawatcher-linux-arm64 ./cmd/mediawatcher + +echo "Done. Binaries in ./bin" diff --git a/systemd/mediawatcher.service b/systemd/mediawatcher.service new file mode 100644 index 0000000..6582e26 --- /dev/null +++ b/systemd/mediawatcher.service @@ -0,0 +1,16 @@ +[Unit] +Description=MediaWatcher Daemon +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +User=media +Group=media +WorkingDirectory=/opt/mediawatcher +ExecStart=/opt/mediawatcher/mediawatcher -config=/opt/mediawatcher/mediawatcher.yml +Restart=always +RestartSec=5 + +[Install] +WantedBy=multi-user.target