Files
d-eyes/filedetection/iterator.go

173 lines
2.9 KiB
Go
Raw Normal View History

2023-11-06 16:31:16 +08:00
package filedetection
import (
"io"
"sync"
"github.com/targodan/go-errors"
)
type Iterator interface {
Next() (File, error)
Close() error
}
type concatIterator struct {
i int
iterators []Iterator
}
func concat(it1 Iterator, it2 Iterator) Iterator {
if it1 == nil {
return it2
}
if it2 == nil {
return it1
}
cit1, ok1 := it1.(*concatIterator)
cit2, ok2 := it2.(*concatIterator)
if ok1 && ok2 {
cit1.iterators = append(cit1.iterators, cit2.iterators...)
return cit1
}
if ok1 {
cit1.iterators = append(cit1.iterators, it2)
return cit1
}
if ok2 {
cit2.iterators = append(cit2.iterators, it1)
return cit2
}
return &concatIterator{
i: 0,
iterators: []Iterator{it1, it2},
}
}
func Concat(iterators ...Iterator) Iterator {
var ret Iterator
for _, it := range iterators {
ret = concat(ret, it)
}
return ret
}
func (it *concatIterator) Next() (File, error) {
if it.i >= len(it.iterators) {
return nil, io.EOF
}
f, err := it.iterators[it.i].Next()
if err == io.EOF {
it.i++
return it.Next()
}
return f, err
}
func (it *concatIterator) Close() error {
var err error
for _, iterator := range it.iterators {
err = errors.NewMultiError(err, iterator.Close())
}
return err
}
type concurrentIterator struct {
iterators []Iterator
c chan *nextEntry
wg *sync.WaitGroup
closed bool
}
func concurrent(it1 Iterator, it2 Iterator) Iterator {
if it1 == nil {
return it2
}
if it2 == nil {
return it1
}
cit1, ok1 := it1.(*concurrentIterator)
cit2, ok2 := it2.(*concurrentIterator)
if ok1 && ok2 {
panic("cannot combine two concurrent iterators")
}
if ok1 {
cit1.iterators = append(cit1.iterators, it2)
cit1.wg.Add(1)
go cit1.consume(len(cit1.iterators) - 1)
return cit1
}
if ok2 {
cit2.iterators = append(cit2.iterators, it1)
cit2.wg.Add(1)
go cit2.consume(len(cit2.iterators) - 1)
return cit2
}
cit := &concurrentIterator{
iterators: []Iterator{it1, it2},
c: make(chan *nextEntry),
wg: new(sync.WaitGroup),
}
cit.wg.Add(2)
go cit.consume(0)
go cit.consume(1)
go func() {
cit.wg.Wait()
close(cit.c)
}()
return cit
}
func Concurrent(iterators ...Iterator) Iterator {
var cit Iterator
for _, it := range iterators {
cit = concurrent(cit, it)
}
return cit
}
func (it *concurrentIterator) consume(i int) {
defer it.wg.Done()
for {
f, err := it.iterators[i].Next()
if err == io.EOF {
break
}
it.c <- &nextEntry{
File: f,
Err: err,
}
}
}
func (it *concurrentIterator) Next() (File, error) {
if it.closed {
return nil, io.EOF
}
next := <-it.c
if next == nil {
return nil, io.EOF
}
return next.File, next.Err
}
func (it *concurrentIterator) Close() error {
if it.closed {
return nil
}
it.closed = true
var err error
for _, iterator := range it.iterators {
err = errors.NewMultiError(err, iterator.Close())
}
return err
}