[GO] Duda sobre orden en patrón

desu

tengo una duda de go, no entiendo el orden en este patrón

https://github.com/lotusirous/go-concurrency-patterns/blob/main/15-bounded-parallelism/main.go

El código lo que hace es lo siguiente:

  1. en una rutina envia a un canal paths
  2. inicializa 20 workers, y cada worker va leyendo estos paths i digiriendo un hash que publica al canal de resultados, lo gestionas con un wg
  3. espero el wg y cierro el canal de resultados

Lo raro aqui:

  1. leo estos resultados...
  2. lo que no entiendo, una vez terminado de evaluar los resultados, miro si hay un error en el canal de error de 1.

Y no lo entiendo porque el range sobre el canal de c es bloqueante hasta que se cierre, por eso el punto 3 esta en una gorutina aparte para no bloquear el main.

Entonces este codigo, si ya he terminado todo el proceso, y cuando leo los resultados (4) ya compruebo si hay un error, no vale de nada

if err := <-errc; err != nil { // HLerrc
		return nil, err
	}

esto no es muy raro? no deberia estar usando un context (errGroup creo, no estoy seguro, pero coordinar los errores) y evaluar si ha habido un error en algun canal cuando evaluas los resultados?

desu

El ejercicio que tengo que hacer es este

https://github.com/gophercises/quiet_hn

Tenga una lista de 450 ids, en secuencial hace 1 peticio por id y va comprobando el resultado, si es bueno lo guarda y obtiene el top 30 mejores.

Yo lo tengo que pasar en paralelo y claro, lo jodido es como mantengo el orden? pienso que es tener un esquema de este estilo:

for worker in workers while not done and not completed:
   http request id
   write result channel

nota: hay otras alternativas como tener en lugar de 30 un buffer de 50 y devolver despues las 30 ordenadas para asegurarte, pero estoy buscando lo optimo.

la idea es que tengo 2 flags, el done para determinar que he terminado de llenar el buffer con el top 30 y el not completed de los workers, de esta manera si por ejemplo hago un batch en 5 workers, cada uno hara 6 peticiones http. cuando los 5 workers terminar seria el completed, pero si no he llenado mi buffer de 30 elementos, el canal done aun no estaria. por tanto tiraria otro batch de 5 workers (esto es facilmente adaptable dinamicamente segun los elementos que me falten a cada batch).

kidandcat

A #1, no veo donde esta el problema, tiene un channel para los paths, y tiene un segundo channel para los errores. Primero esta sacando todos los paths posibles, si hay error no detiene nada, pasa al siguiente, y luego evalua si ha habido error o en el channel de paths o en el de errores, esto es porque puede tener errores de diferentes fuentes:

Has probado a comprobar los errores a la hora de hacer el walk justo despues? Como tu dices no tiene mucho sentido esperar a procesar los checksums para cancelar al final y lanzar un error.

Para el orden como tu dices es lo mejor, simplemente ve guardando IDs, y luego ordenas.

Lo que si puedes hacer para optimizar al máximo es, como sabes con antelacion el tamaño de tu cache, usa un mapa indexado, de esa forma no tienes que ordenar nada después, sino que ve añadiendo los elementos conforme van terminando tus goroutinas directamente por el indice.

Ejemplo (sin probar):

cache := make(map[int]string, 30)
for i := 0; i < digesters; i++ {
		go func(int index) {
		        url := fetchURL()
                        cache[index] = url
			wg.Done()
		}(i)
	}
1 respuesta
desu

#3 No se si no me he explicado bien pero no me refería a esa función de WalkFiles, esa no veo ningún problema es un producer normal. Sino en la func Md5, que es el main y son los puntos 1,2,3,4,5 que he comentado. edit: aunque mirandolo en una segunda vuelta para cancelar la ejecucion del resto lo veo raruno tambien el tener ese done channel... nse me lo apunto para investigar mas adelante.

func MD5All(root string) (map[string][md5.Size]byte, error) {
	// MD5All closes the done channel when it returns; it may do so before
	// receiving all the values from c and errc.
	done := make(chan struct{})
	defer close(done)

paths, errc := walkFiles(done, root)

// Start a fixed number of goroutines to read and digest files.
c := make(chan result) // HLc
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
	go func() {
		digester(done, paths, c) // HLc
		wg.Done()
	}()
}
go func() {
	wg.Wait()
	close(c) // HLc
}()
// End of pipeline. OMIT

m := make(map[string][md5.Size]byte)
for r := range c {
	if r.err != nil {
		return nil, r.err
	}
	m[r.path] = r.sum
}
// Check whether the Walk failed.
if err := <-errc; err != nil { // HLerrc
	return nil, err
}
return m, nil

Que sentido tiene comprobar si el WalkFunc a fallado al final? El errc no esta bien usado porque no tiene sentido, debería tener en algún lado un errGroup. Otro dia me lo mirare, pero entiendo que la idea seria cancelar todo lo posterior a partir de estre errc cuando haya algo mal y devolver el error.

Sobre el segundo caso olvídalo, gracias igualmente. Es que lo movieron aquí sin sentido... Se como solucionarlo... Tu respuesta no es del todo correcta porque de esos 30 algunas pueden ser invalidas y requieres pedir mas... pero da igual como digo... Solo era un ejemplo de problema que resuelvo con estos patrones...

aqu un ejemplo de errGroup y usar el ctx.done para cancelar https://www.mariocarrion.com/2020/08/19/go-implementing-complex-pipelines-part-4.html la clave entiendo que es tener el case:

			case <-ctx.Done():

para propagar y detectar los cambios y cancelar todo con el ctx compartido entre gorutinas.

Yo inicializaria asi:

	g, ctx := errgroup.WithContext(context.Background())

y ejecutaria todo con este grupo y contexto, asi no necesitas canal de done ni error... cuando detectes que tu context esta listo mientras estas en runtime es que ha habido un error y lo propagas a tu grupo.

kidandcat

Lo de comprobar el WalkFunc al final solo lo entiendo si para el ejemplo lo han hecho asi pero a futuro quisieran no cancelar por un error, sino no tiene ningun sentido.

Me rayó un poco lo del segundo comentario, no lo conseguía relacionar al primero xD

desu

Lo tengo medio hecho... esto funciona:

spoiler

Ahora estoy tratando de hacer el result que me salta un deadlock... XD all gorutines asleep :(

pero vamos la idea que yo tengo es esta.

tengo una gorutina en un errogrup que lee paths
tengo N workers en este group que van haciendo digest y publicando en results (c chan result)
si hay cualquier error uso el ctx para cortar...

hago un g.Wait() y cierro los canales...

luego proceso el resultado

edit2: esto es lo mejor que tengo, no entiendo el deadlock aun

package main

import (
	"context"
	"crypto/md5"
	"fmt"
	"io/ioutil"
	"os"
	"path/filepath"
	"sort"

"golang.org/x/sync/errgroup"
)

func walkFiles(ctx context.Context, root string, paths chan<- string) error {
	return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
		if err != nil {
			return err
		}
		if !info.Mode().IsRegular() {
			return nil
		}
		select {
		case paths <- path:
		case <-ctx.Done():
			return ctx.Err()
		}
		return nil
	})
}

type result struct {
	path string
	sum  [md5.Size]byte
	err  error
}

func digester(ctx context.Context, paths <-chan string, c chan<- result) error {
	for {
		select {
		case path, open := <-paths:
			if !open {
				return nil
			}
			data, err := ioutil.ReadFile(path)
			select {
			case c <- result{path, md5.Sum(data), err}:
			case <-ctx.Done():
				return ctx.Err()
			}
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

func MD5All(root string) (map[string][md5.Size]byte, error) {
	g, ctx := errgroup.WithContext(context.Background())

paths := make(chan string)

g.Go(func() error {
	return walkFiles(ctx, root, paths)
})

c := make(chan result)

numDigesters := 1
for i := 0; i < numDigesters; i++ {
	g.Go(func() error {
		return digester(ctx, paths, c)
	})
}

go func() {
	g.Wait()
	close(paths)
	close(c)
}()

m := make(map[string][md5.Size]byte)
for {
	select {
	case r, open := <-c:
		if !open {
			return m, nil
		}
		if r.err != nil {
			return nil, r.err
		}
		m[r.path] = r.sum

	}
}
}

func main() {
	m, err := MD5All(os.Args[1])
	if err != nil {
		fmt.Println(err)
		return
	}
	var paths []string
	for path := range m {
		paths = append(paths, path)
	}
	sort.Strings(paths)
	for _, path := range paths {
		fmt.Printf("%x  %s\n", m[path], path)
	}
}

he cambiado los range a for select y en teoria asi no bloqueo no? y cuando se cierra XD

aver si alguien que sabe go me resuelve la duda de como se hace esto. es el primer ejercicio con errgroups y asi que hago.

edit3:

If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed.

blocks ayy

desu

Bueno, ya he encontrado 1 bug y ahora ya funciona. he deshecho tambien la confusion del for select unblocking... solo tiene ese comportamiento con un default... ayy

spoiler

Tansolo me queda una duda y es porque necesito tener el wait en una gorutina? el close lo hago para el range de despues ... eso lo tengo claro, pero no veo el motivo de ponerlo a parte, entendia que el g.Wait() me bloqueaba el main y esperaba a que todas las gorutinas del grupo finalizasen... entonces no veo el bloqueo...

	go func() {
		if err := g.Wait(); err != nil {
			panic(err)
		}
		close(c)
	}()

he visto ejemplos donde lo hacen y otros donde no... asique estoy comprendiendo aun 1 cosa mas mal...

en este ejemplo bloquea el main: https://www.mariocarrion.com/2020/08/19/go-implementing-complex-pipelines-part-4.html

edit: toca en una gorutina porque al llegar al range del result desbloquea 1 de las gorutinas.

dejo este post a modo de diario y recuerdo, primer dia haciendo gorutinas... arreglando tutoriales deprecated kekekekek

si alguien tiene algo mas que aportar sobre ctx, errgroups o patrones de concurrencia adelante.

Usuarios habituales

  • desu
  • kidandcat