more initial changes

This commit is contained in:
Javis Sullivan
2025-12-08 07:56:46 -05:00
parent 0e0fe0e37c
commit 4bed19ab73
14 changed files with 701 additions and 32 deletions

View File

@@ -1,27 +1,112 @@
package classifier
import (
"fmt"
"path/filepath"
"regexp"
"strconv"
"strings"
"mediawatcher/internal/config"
)
var movieYear = regexp.MustCompile(`(?i)(19|20)\d{2}`)
type Type string
func classifyMovie(filename string) (bool, string, int) {
base := strings.TrimSuffix(filepath.Base(filename), filepath.Ext(filename))
const (
TypeTV Type = "tv"
TypeMovie Type = "movie"
TypeMisc Type = "misc"
TypeUnknown Type = "unknown"
)
// Find year
yearMatch := movieYear.FindString(base)
if yearMatch == "" {
return false, "", 0
type Result struct {
Type Type
Title string
Season int
Episode int
Year int
DestDir string
DestName string
SourcePath string
}
// regexes
var (
tvPattern = regexp.MustCompile(`(?i)(S?(\d{1,2}))[ ._-]*[Ex](\d{1,3})`)
yearPattern = regexp.MustCompile(`\b(19\d{2}|20[0-3]\d)\b`)
)
func Classify(path string, cfg *config.Config) Result {
base := filepath.Base(path)
nameNoExt := strings.TrimSuffix(base, filepath.Ext(base))
// TV first
if m := tvPattern.FindStringSubmatch(nameNoExt); len(m) == 4 {
season := atoiSafe(m[2])
episode := atoiSafe(m[3])
cleaned := tvPattern.ReplaceAllString(nameNoExt, "")
title := cleanupTitle(cleaned)
destDir := filepath.Join(cfg.Structure.TVDir, title, fmt.Sprintf("Season %02d", season))
return Result{
Type: TypeTV,
Title: title,
Season: season,
Episode: episode,
DestDir: destDir,
DestName: base,
SourcePath: path,
}
}
year := parseInt(yearMatch)
// Movie
if ym := yearPattern.FindString(nameNoExt); ym != "" {
year := atoiSafe(ym)
cleaned := strings.Replace(nameNoExt, ym, "", 1)
title := cleanupTitle(cleaned)
destDir := filepath.Join(cfg.Structure.MoviesDir, fmt.Sprintf("%s (%d)", title, year))
// Extract title (everything before year)
title := strings.TrimSpace(strings.Replace(base, yearMatch, "", 1))
title = cleanupTitle(title)
return Result{
Type: TypeMovie,
Title: title,
Year: year,
DestDir: destDir,
DestName: base,
SourcePath: path,
}
}
return true, title, year
// Misc based on extension
ext := strings.ToLower(filepath.Ext(base))
if ext == ".mp3" || ext == ".flac" || ext == ".m4a" || ext == ".aac" {
return Result{
Type: TypeMisc,
DestDir: cfg.Structure.MiscDir,
DestName: base,
SourcePath: path,
}
}
// Unknown
return Result{
Type: TypeUnknown,
DestDir: cfg.Structure.UnknownDir,
DestName: base,
SourcePath: path,
}
}
func cleanupTitle(s string) string {
s = strings.ReplaceAll(s, ".", " ")
s = strings.ReplaceAll(s, "_", " ")
s = strings.ReplaceAll(s, "-", " ")
s = strings.Join(strings.Fields(s), " ")
return strings.TrimSpace(s)
}
func atoiSafe(s string) int {
n, _ := strconv.Atoi(s)
return n
}

View File

@@ -1,10 +1,19 @@
package config
import (
"fmt"
"os"
"gopkg.in/yaml.v3"
)
type WatchConfig struct {
Dirs []string `yaml:"dirs"`
StableSeconds int `yaml:"stable_seconds"`
IncludeExt []string `yaml:"include_ext"`
ExcludeExt []string `yaml:"exclude_ext"`
}
type StructureConfig struct {
MoviesDir string `yaml:"movies_dir"`
TVDir string `yaml:"tv_dir"`
@@ -13,24 +22,75 @@ type StructureConfig struct {
AutoCreate bool `yaml:"auto_create"`
}
type WatchConfig struct {
Dirs []string `yaml:"dirs"`
type RsyncTarget struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
User string `yaml:"user"`
DestBase string `yaml:"dest_base"`
ExtraArgs []string `yaml:"extra_args"`
}
type SyncConfig struct {
Enabled bool `yaml:"enabled"`
RsyncBinary string `yaml:"rsync_binary"`
Targets []RsyncTarget `yaml:"targets"`
}
type NotifierEndpoint struct {
Name string `yaml:"name"`
Method string `yaml:"method"`
URL string `yaml:"url"`
Headers map[string]string `yaml:"headers"`
Body string `yaml:"body"`
}
type NotifierConfig struct {
Enabled bool `yaml:"enabled"`
Endpoints []NotifierEndpoint `yaml:"endpoints"`
}
type LoggingConfig struct {
Level string `yaml:"level"`
}
type Config struct {
Watch WatchConfig `yaml:"watch"`
Structure StructureConfig `yaml:"structure"`
Sync SyncConfig `yaml:"sync"`
Notifier NotifierConfig `yaml:"notifier"`
Logging LoggingConfig `yaml:"logging"`
}
func LoadConfig(path string) (*Config, error) {
func Load(path string) (*Config, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
return nil, fmt.Errorf("read config: %w", err)
}
var cfg Config
if err := yaml.Unmarshal(data, &cfg); err != nil {
return nil, err
return nil, fmt.Errorf("parse yaml: %w", err)
}
if cfg.Watch.StableSeconds <= 0 {
cfg.Watch.StableSeconds = 15
}
if cfg.Sync.RsyncBinary == "" {
cfg.Sync.RsyncBinary = "/usr/bin/rsync"
}
if cfg.Structure.MoviesDir == "" {
cfg.Structure.MoviesDir = "/srv/media/incoming/movies"
}
if cfg.Structure.TVDir == "" {
cfg.Structure.TVDir = "/srv/media/incoming/tv"
}
if cfg.Structure.MiscDir == "" {
cfg.Structure.MiscDir = "/srv/media/incoming/misc"
}
if cfg.Structure.UnknownDir == "" {
cfg.Structure.UnknownDir = "/srv/media/incoming/unknown"
}
return &cfg, nil

View File

@@ -0,0 +1,19 @@
package mover
import (
"fmt"
"os"
"path/filepath"
)
// Move moves the file to destDir/destName, creating directories as needed.
func Move(src, destDir, destName string) (string, error) {
if err := os.MkdirAll(destDir, 0o755); err != nil {
return "", fmt.Errorf("mkdir %s: %w", destDir, err)
}
destPath := filepath.Join(destDir, destName)
if err := os.Rename(src, destPath); err != nil {
return "", fmt.Errorf("rename %s -> %s: %w", src, destPath, err)
}
return destPath, nil
}

View File

@@ -0,0 +1,70 @@
package notifier
import (
"bytes"
"log"
"net/http"
"time"
"mediawatcher/internal/classifier"
"mediawatcher/internal/config"
)
type Notifier struct {
cfg *config.NotifierConfig
cli *http.Client
}
func New(cfg *config.NotifierConfig) *Notifier {
if cfg == nil || !cfg.Enabled {
return nil
}
return &Notifier{
cfg: cfg,
cli: &http.Client{Timeout: 10 * time.Second},
}
}
func (n *Notifier) Notify(res classifier.Result) {
if n == nil || !n.cfg.Enabled {
return
}
for _, ep := range n.cfg.Endpoints {
if err := n.callEndpoint(ep, res); err != nil {
log.Printf("[notify] %s error: %v", ep.Name, err)
}
}
}
func (n *Notifier) callEndpoint(ep config.NotifierEndpoint, res classifier.Result) error {
if ep.URL == "" {
return nil
}
method := ep.Method
if method == "" {
method = http.MethodPost
}
body := []byte(ep.Body)
req, err := http.NewRequest(method, ep.URL, bytes.NewReader(body))
if err != nil {
return err
}
for k, v := range ep.Headers {
req.Header.Set(k, v)
}
if req.Header.Get("Content-Type") == "" && len(body) > 0 {
req.Header.Set("Content-Type", "application/json")
}
resp, err := n.cli.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
log.Printf("[notify] %s returned status %d", ep.Name, resp.StatusCode)
}
return nil
}

81
internal/syncer/syncer.go Normal file
View File

@@ -0,0 +1,81 @@
package syncer
import (
"bytes"
"fmt"
"log"
"os/exec"
"path/filepath"
"mediawatcher/internal/classifier"
"mediawatcher/internal/config"
)
type Syncer struct {
cfg *config.SyncConfig
}
func New(cfg *config.SyncConfig) *Syncer {
if cfg == nil || !cfg.Enabled {
return nil
}
return &Syncer{cfg: cfg}
}
func (s *Syncer) Sync(res classifier.Result) {
if s == nil || !s.cfg.Enabled {
return
}
for _, t := range s.cfg.Targets {
if err := s.syncToTarget(t, res); err != nil {
log.Printf("[sync] error syncing to %s: %v", t.Host, err)
}
}
}
func (s *Syncer) syncToTarget(t config.RsyncTarget, res classifier.Result) error {
if t.Host == "" || t.DestBase == "" {
return fmt.Errorf("invalid target config: host and dest_base required")
}
user := t.User
if user == "" {
user = "media"
}
port := t.Port
if port == 0 {
port = 22
}
// Recreate relative dest under DestBase
rel, err := filepath.Rel(res.DestDir, filepath.Join(res.DestDir, res.DestName))
if err != nil {
rel = res.DestName
}
remoteDir := t.DestBase
remote := fmt.Sprintf("%s@%s:%s/", user, t.Host, remoteDir)
args := []string{"-avh"}
if port != 22 {
args = append(args, "-e", fmt.Sprintf("ssh -p %d", port))
}
args = append(args, t.ExtraArgs...)
args = append(args, filepath.Join(res.DestDir, res.DestName), remote)
log.Printf("[sync] rsync %v", args)
cmd := exec.Command(s.cfg.RsyncBinary, args...)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
log.Printf("[sync] stdout: %s", stdout.String())
log.Printf("[sync] stderr: %s", stderr.String())
return fmt.Errorf("rsync failed: %w", err)
}
return nil
}

60
internal/util/fileutil.go Normal file
View File

@@ -0,0 +1,60 @@
package util
import (
"os"
"path/filepath"
"strings"
"time"
)
// HasExt reports whether path has one of the provided extensions (case-insensitive, with or without dot).
func HasExt(path string, exts []string) bool {
if len(exts) == 0 {
return true
}
ext := strings.ToLower(filepath.Ext(path))
for _, e := range exts {
e = strings.ToLower(e)
if !strings.HasPrefix(e, ".") {
e = "." + e
}
if ext == e {
return true
}
}
return false
}
// WaitForStable waits until the file size has not changed for stableFor duration.
func WaitForStable(path string, stableFor time.Duration, timeout time.Duration) error {
start := time.Now()
var lastSize int64 = -1
var stableStart time.Time
for {
info, err := os.Stat(path)
if err != nil {
return err
}
size := info.Size()
if size == lastSize {
if stableStart.IsZero() {
stableStart = time.Now()
}
if time.Since(stableStart) >= stableFor {
return nil
}
} else {
stableStart = time.Time{}
}
lastSize = size
if time.Since(start) > timeout {
return os.ErrDeadlineExceeded
}
time.Sleep(2 * time.Second)
}
}

View File

@@ -2,33 +2,125 @@ package watcher
import (
"log"
"os"
"path/filepath"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"mediawatcher/internal/config"
"mediawatcher/internal/util"
)
type FileHandler func(path string)
func WatchDirs(dirs []string, handler FileHandler) error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer watcher.Close()
type Watcher struct {
cfg *config.Config
watcher *fsnotify.Watcher
pending map[string]struct{}
mu sync.Mutex
}
for _, d := range dirs {
if err := watcher.Add(d); err != nil {
func New(cfg *config.Config) (*Watcher, error) {
w, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
return &Watcher{
cfg: cfg,
watcher: w,
pending: make(map[string]struct{}),
}, nil
}
func (w *Watcher) Close() error {
return w.watcher.Close()
}
func (w *Watcher) Start(handler FileHandler) error {
for _, dir := range w.cfg.Watch.Dirs {
if err := w.addDir(dir); err != nil {
return err
}
}
go w.loop(handler)
return nil
}
func (w *Watcher) addDir(dir string) error {
info, err := os.Stat(dir)
if err != nil {
return err
}
if !info.IsDir() {
return nil
}
log.Printf("[watch] watching %s", dir)
return w.watcher.Add(dir)
}
func (w *Watcher) loop(handler FileHandler) {
for {
select {
case event := <-watcher.Events:
if event.Op&(fsnotify.Create|fsnotify.Write) != 0 {
handler(event.Name)
case event, ok := <-w.watcher.Events:
if !ok {
return
}
if event.Op&(fsnotify.Create|fsnotify.Write|fsnotify.Rename) == 0 {
continue
}
case err := <-watcher.Errors:
log.Printf("Watcher error: %v\n", err)
path := event.Name
info, err := os.Stat(path)
if err != nil || info.IsDir() {
continue
}
extIncl := util.HasExt(path, w.cfg.Watch.IncludeExt)
extExcl := util.HasExt(path, w.cfg.Watch.ExcludeExt)
if !extIncl || extExcl {
continue
}
w.mu.Lock()
if _, exists := w.pending[path]; exists {
w.mu.Unlock()
continue
}
w.pending[path] = struct{}{}
w.mu.Unlock()
go w.handleFile(path, handler)
case err, ok := <-w.watcher.Errors:
if !ok {
return
}
log.Printf("[watch] error: %v", err)
}
}
}
func (w *Watcher) handleFile(path string, handler FileHandler) {
defer func() {
w.mu.Lock()
delete(w.pending, path)
w.mu.Unlock()
}()
stableFor := time.Duration(w.cfg.Watch.StableSeconds) * time.Second
if err := util.WaitForStable(path, stableFor, 2*time.Hour); err != nil {
log.Printf("[watch] file %s not stable: %v", path, err)
return
}
abs, err := filepath.Abs(path)
if err != nil {
abs = path
}
handler(abs)
}