diff --git a/.idea/git_toolbox_prj.xml b/.idea/git_toolbox_prj.xml new file mode 100644 index 0000000..02b915b --- /dev/null +++ b/.idea/git_toolbox_prj.xml @@ -0,0 +1,15 @@ + + + + + + + \ No newline at end of file diff --git a/.idea/jsonSchemas.xml b/.idea/jsonSchemas.xml new file mode 100644 index 0000000..6fb00f9 --- /dev/null +++ b/.idea/jsonSchemas.xml @@ -0,0 +1,25 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/adapters/processors/pdf.go b/adapters/processors/pdf.go index 2098390..1dda6b8 100644 --- a/adapters/processors/pdf.go +++ b/adapters/processors/pdf.go @@ -29,8 +29,9 @@ func (P *PDF) Process(_ context.Context, url string) ([]entity.File, error) { gen.Title.Set(url) page := wkhtmltopdf.NewPage(url) + page.PrintMediaType.Set(true) page.JavascriptDelay.Set(200) - page.LoadMediaErrorHandling.Set("abort") + page.LoadMediaErrorHandling.Set("ignore") page.FooterRight.Set("[page]") page.HeaderLeft.Set(url) page.HeaderRight.Set(time.Now().Format(time.DateOnly)) diff --git a/adapters/processors/processors.go b/adapters/processors/processors.go index 1b36155..88f0a43 100644 --- a/adapters/processors/processors.go +++ b/adapters/processors/processors.go @@ -69,7 +69,7 @@ func (p *Processors) Process(ctx context.Context, format entity.Format, url stri proc, ok := p.processors[format] if !ok { - result.Err = fmt.Errorf("no processor registered for format %v", format) + result.Err = fmt.Errorf("no processor registered") return result } @@ -86,7 +86,7 @@ func (p *Processors) Process(ctx context.Context, format entity.Format, url stri return result } -func (p *Processors) Override(format entity.Format, proc processor) error { +func (p *Processors) OverrideProcessor(format entity.Format, proc processor) error { p.processors[format] = proc return nil diff --git a/adapters/processors/simplesample.pdf b/adapters/processors/simplesample.pdf deleted file mode 100644 index 7a94299..0000000 Binary files a/adapters/processors/simplesample.pdf and /dev/null differ diff --git a/adapters/repository/badger/file.go b/adapters/repository/badger/file.go deleted file mode 100644 index 6d9512b..0000000 --- a/adapters/repository/badger/file.go +++ /dev/null @@ -1,40 +0,0 @@ -package badger - -import ( - "context" - "fmt" - - "github.com/dgraph-io/badger/v4" - - "github.com/derfenix/webarchive/entity" -) - -func NewFile(db *badger.DB) *File { - return &File{db: db, prefix: []byte("file:")} -} - -type File struct { - db *badger.DB - prefix []byte -} - -func (f *File) SaveTx(_ context.Context, txn *badger.Txn, file *entity.File) error { - if f.db.IsClosed() { - return ErrDBClosed - } - - marshaled, err := marshal(file) - if err != nil { - return fmt.Errorf("marshal data: %w", err) - } - - if err := txn.Set(f.key(file), marshaled); err != nil { - return fmt.Errorf("put data: %w", err) - } - - return nil -} - -func (f *File) key(file *entity.File) []byte { - return append(f.prefix, []byte(file.ID.String())...) -} diff --git a/adapters/repository/badger/page.go b/adapters/repository/badger/page.go index ddb0f19..2c2f360 100644 --- a/adapters/repository/badger/page.go +++ b/adapters/repository/badger/page.go @@ -11,21 +11,60 @@ import ( "github.com/derfenix/webarchive/entity" ) -func NewPage(db *badger.DB, file *File) (*Page, error) { +func NewPage(db *badger.DB) (*Page, error) { return &Page{ db: db, prefix: []byte("page:"), - file: file, }, nil } type Page struct { db *badger.DB prefix []byte - file *File } -func (p *Page) Save(ctx context.Context, site *entity.Page) error { +func (p *Page) GetFile(_ context.Context, pageID, fileID uuid.UUID) (*entity.File, error) { + page := entity.Page{ID: pageID} + var file *entity.File + + err := p.db.View(func(txn *badger.Txn) error { + data, err := txn.Get(p.key(&page)) + if err != nil { + return fmt.Errorf("get data: %w", err) + } + + err = data.Value(func(val []byte) error { + if err := unmarshal(val, &page); err != nil { + return fmt.Errorf("unmarshal data: %w", err) + } + + return nil + }) + if err != nil { + return fmt.Errorf("get value: %w", err) + } + + for i := range page.Results.Results() { + for j := range page.Results.Results()[i].Files { + ff := &page.Results.Results()[i].Files[j] + + if ff.ID == fileID { + file = ff + } + } + + } + + return nil + }) + if err != nil { + return nil, fmt.Errorf("view: %w", err) + } + + return file, nil +} + +func (p *Page) Save(_ context.Context, site *entity.Page) error { if p.db.IsClosed() { return ErrDBClosed } @@ -40,14 +79,6 @@ func (p *Page) Save(ctx context.Context, site *entity.Page) error { return fmt.Errorf("put data: %w", err) } - for i, result := range site.Results.Results() { - for j, file := range result.Files { - if err := p.file.SaveTx(ctx, txn, &file); err != nil { - return fmt.Errorf("save file %d (%s) for result %d: %w", j, file.ID.String(), i, err) - } - } - } - return nil }); err != nil { return fmt.Errorf("update db: %w", err) diff --git a/api/openapi.yaml b/api/openapi.yaml index 24b0f2e..33efb7c 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -1,4 +1,4 @@ -openapi: 3.0.1 +openapi: 3.1.0 info: title: Sample API description: API description in Markdown. @@ -71,6 +71,39 @@ paths: default: $ref: '#/components/responses/undefinedError' + /pages/{id}/file/{file_id}: + parameters: + - in: path + name: id + required: true + schema: + type: string + format: uuid + - in: path + name: file_id + required: true + schema: + type: string + format: uuid + get: + operationId: getFile + description: Get file content + responses: + 200: + description: File content + content: + application/pdf: {} + text/plain: + schema: + type: string + text/html: + schema: + type: string + 404: + description: Page of file not found + default: + $ref: '#/components/responses/undefinedError' + components: responses: undefinedError: diff --git a/api/openapi/oas_client_gen.go b/api/openapi/oas_client_gen.go index 912bc8c..1a1763d 100644 --- a/api/openapi/oas_client_gen.go +++ b/api/openapi/oas_client_gen.go @@ -160,6 +160,113 @@ func (c *Client) sendAddPage(ctx context.Context, request OptAddPageReq) (res *P return result, nil } +// GetFile invokes getFile operation. +// +// Get file content. +// +// GET /pages/{id}/file/{file_id} +func (c *Client) GetFile(ctx context.Context, params GetFileParams) (GetFileRes, error) { + res, err := c.sendGetFile(ctx, params) + _ = res + return res, err +} + +func (c *Client) sendGetFile(ctx context.Context, params GetFileParams) (res GetFileRes, err error) { + otelAttrs := []attribute.KeyValue{ + otelogen.OperationID("getFile"), + } + + // Run stopwatch. + startTime := time.Now() + defer func() { + elapsedDuration := time.Since(startTime) + c.duration.Record(ctx, elapsedDuration.Microseconds(), otelAttrs...) + }() + + // Increment request counter. + c.requests.Add(ctx, 1, otelAttrs...) + + // Start a span for this request. + ctx, span := c.cfg.Tracer.Start(ctx, "GetFile", + trace.WithAttributes(otelAttrs...), + clientSpanKind, + ) + // Track stage for error reporting. + var stage string + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, stage) + c.errors.Add(ctx, 1, otelAttrs...) + } + span.End() + }() + + stage = "BuildURL" + u := uri.Clone(c.requestURL(ctx)) + var pathParts [4]string + pathParts[0] = "/pages/" + { + // Encode "id" parameter. + e := uri.NewPathEncoder(uri.PathEncoderConfig{ + Param: "id", + Style: uri.PathStyleSimple, + Explode: false, + }) + if err := func() error { + return e.EncodeValue(conv.UUIDToString(params.ID)) + }(); err != nil { + return res, errors.Wrap(err, "encode path") + } + encoded, err := e.Result() + if err != nil { + return res, errors.Wrap(err, "encode path") + } + pathParts[1] = encoded + } + pathParts[2] = "/file/" + { + // Encode "file_id" parameter. + e := uri.NewPathEncoder(uri.PathEncoderConfig{ + Param: "file_id", + Style: uri.PathStyleSimple, + Explode: false, + }) + if err := func() error { + return e.EncodeValue(conv.UUIDToString(params.FileID)) + }(); err != nil { + return res, errors.Wrap(err, "encode path") + } + encoded, err := e.Result() + if err != nil { + return res, errors.Wrap(err, "encode path") + } + pathParts[3] = encoded + } + uri.AddPathParts(u, pathParts[:]...) + + stage = "EncodeRequest" + r, err := ht.NewRequest(ctx, "GET", u, nil) + if err != nil { + return res, errors.Wrap(err, "create request") + } + + stage = "SendRequest" + resp, err := c.cfg.Client.Do(r) + if err != nil { + return res, errors.Wrap(err, "do request") + } + defer resp.Body.Close() + + stage = "DecodeResponse" + result, err := decodeGetFileResponse(resp) + if err != nil { + return res, errors.Wrap(err, "decode response") + } + + return result, nil +} + // GetPage invokes getPage operation. // // Get page details. diff --git a/api/openapi/oas_handlers_gen.go b/api/openapi/oas_handlers_gen.go index 5676049..eec8fad 100644 --- a/api/openapi/oas_handlers_gen.go +++ b/api/openapi/oas_handlers_gen.go @@ -129,6 +129,120 @@ func (s *Server) handleAddPageRequest(args [0]string, argsEscaped bool, w http.R } } +// handleGetFileRequest handles getFile operation. +// +// Get file content. +// +// GET /pages/{id}/file/{file_id} +func (s *Server) handleGetFileRequest(args [2]string, argsEscaped bool, w http.ResponseWriter, r *http.Request) { + otelAttrs := []attribute.KeyValue{ + otelogen.OperationID("getFile"), + semconv.HTTPMethodKey.String("GET"), + semconv.HTTPRouteKey.String("/pages/{id}/file/{file_id}"), + } + + // Start a span for this request. + ctx, span := s.cfg.Tracer.Start(r.Context(), "GetFile", + trace.WithAttributes(otelAttrs...), + serverSpanKind, + ) + defer span.End() + + // Run stopwatch. + startTime := time.Now() + defer func() { + elapsedDuration := time.Since(startTime) + s.duration.Record(ctx, elapsedDuration.Microseconds(), otelAttrs...) + }() + + // Increment request counter. + s.requests.Add(ctx, 1, otelAttrs...) + + var ( + recordError = func(stage string, err error) { + span.RecordError(err) + span.SetStatus(codes.Error, stage) + s.errors.Add(ctx, 1, otelAttrs...) + } + err error + opErrContext = ogenerrors.OperationContext{ + Name: "GetFile", + ID: "getFile", + } + ) + params, err := decodeGetFileParams(args, argsEscaped, r) + if err != nil { + err = &ogenerrors.DecodeParamsError{ + OperationContext: opErrContext, + Err: err, + } + recordError("DecodeParams", err) + s.cfg.ErrorHandler(ctx, w, r, err) + return + } + + var response GetFileRes + if m := s.cfg.Middleware; m != nil { + mreq := middleware.Request{ + Context: ctx, + OperationName: "GetFile", + OperationID: "getFile", + Body: nil, + Params: middleware.Parameters{ + { + Name: "id", + In: "path", + }: params.ID, + { + Name: "file_id", + In: "path", + }: params.FileID, + }, + Raw: r, + } + + type ( + Request = struct{} + Params = GetFileParams + Response = GetFileRes + ) + response, err = middleware.HookMiddleware[ + Request, + Params, + Response, + ]( + m, + mreq, + unpackGetFileParams, + func(ctx context.Context, request Request, params Params) (response Response, err error) { + response, err = s.h.GetFile(ctx, params) + return response, err + }, + ) + } else { + response, err = s.h.GetFile(ctx, params) + } + if err != nil { + recordError("Internal", err) + if errRes, ok := errors.Into[*ErrorStatusCode](err); ok { + encodeErrorResponse(errRes, w, span) + return + } + if errors.Is(err, ht.ErrNotImplemented) { + s.cfg.ErrorHandler(ctx, w, r, err) + return + } + encodeErrorResponse(s.h.NewError(ctx, err), w, span) + return + } + + if err := encodeGetFileResponse(response, w, span); err != nil { + recordError("EncodeResponse", err) + s.cfg.ErrorHandler(ctx, w, r, err) + return + } +} + // handleGetPageRequest handles getPage operation. // // Get page details. diff --git a/api/openapi/oas_interfaces_gen.go b/api/openapi/oas_interfaces_gen.go index 5bbe0d4..b1b2f70 100644 --- a/api/openapi/oas_interfaces_gen.go +++ b/api/openapi/oas_interfaces_gen.go @@ -1,6 +1,10 @@ // Code generated by ogen, DO NOT EDIT. package openapi +type GetFileRes interface { + getFileRes() +} + type GetPageRes interface { getPageRes() } diff --git a/api/openapi/oas_parameters_gen.go b/api/openapi/oas_parameters_gen.go index a0504da..f76812a 100644 --- a/api/openapi/oas_parameters_gen.go +++ b/api/openapi/oas_parameters_gen.go @@ -16,6 +16,124 @@ import ( "github.com/ogen-go/ogen/validate" ) +// GetFileParams is parameters of getFile operation. +type GetFileParams struct { + ID uuid.UUID + FileID uuid.UUID +} + +func unpackGetFileParams(packed middleware.Parameters) (params GetFileParams) { + { + key := middleware.ParameterKey{ + Name: "id", + In: "path", + } + params.ID = packed[key].(uuid.UUID) + } + { + key := middleware.ParameterKey{ + Name: "file_id", + In: "path", + } + params.FileID = packed[key].(uuid.UUID) + } + return params +} + +func decodeGetFileParams(args [2]string, argsEscaped bool, r *http.Request) (params GetFileParams, _ error) { + // Decode path: id. + if err := func() error { + param := args[0] + if argsEscaped { + unescaped, err := url.PathUnescape(args[0]) + if err != nil { + return errors.Wrap(err, "unescape path") + } + param = unescaped + } + if len(param) > 0 { + d := uri.NewPathDecoder(uri.PathDecoderConfig{ + Param: "id", + Value: param, + Style: uri.PathStyleSimple, + Explode: false, + }) + + if err := func() error { + val, err := d.DecodeValue() + if err != nil { + return err + } + + c, err := conv.ToUUID(val) + if err != nil { + return err + } + + params.ID = c + return nil + }(); err != nil { + return err + } + } else { + return validate.ErrFieldRequired + } + return nil + }(); err != nil { + return params, &ogenerrors.DecodeParamError{ + Name: "id", + In: "path", + Err: err, + } + } + // Decode path: file_id. + if err := func() error { + param := args[1] + if argsEscaped { + unescaped, err := url.PathUnescape(args[1]) + if err != nil { + return errors.Wrap(err, "unescape path") + } + param = unescaped + } + if len(param) > 0 { + d := uri.NewPathDecoder(uri.PathDecoderConfig{ + Param: "file_id", + Value: param, + Style: uri.PathStyleSimple, + Explode: false, + }) + + if err := func() error { + val, err := d.DecodeValue() + if err != nil { + return err + } + + c, err := conv.ToUUID(val) + if err != nil { + return err + } + + params.FileID = c + return nil + }(); err != nil { + return err + } + } else { + return validate.ErrFieldRequired + } + return nil + }(); err != nil { + return params, &ogenerrors.DecodeParamError{ + Name: "file_id", + In: "path", + Err: err, + } + } + return params, nil +} + // GetPageParams is parameters of getPage operation. type GetPageParams struct { ID uuid.UUID diff --git a/api/openapi/oas_response_decoders_gen.go b/api/openapi/oas_response_decoders_gen.go index b50c296..96f4f71 100644 --- a/api/openapi/oas_response_decoders_gen.go +++ b/api/openapi/oas_response_decoders_gen.go @@ -3,6 +3,7 @@ package openapi import ( + "bytes" "io" "mime" "net/http" @@ -97,6 +98,94 @@ func decodeAddPageResponse(resp *http.Response) (res *Page, err error) { return res, errors.Wrap(defRes, "error") } +func decodeGetFileResponse(resp *http.Response) (res GetFileRes, err error) { + switch resp.StatusCode { + case 200: + // Code 200. + ct, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) + if err != nil { + return res, errors.Wrap(err, "parse media type") + } + switch { + case ct == "application/pdf": + reader := resp.Body + b, err := io.ReadAll(reader) + if err != nil { + return res, err + } + + response := GetFileOKApplicationPdf{Data: bytes.NewReader(b)} + return &response, nil + case ct == "text/html": + reader := resp.Body + b, err := io.ReadAll(reader) + if err != nil { + return res, err + } + + response := GetFileOKTextHTML{Data: bytes.NewReader(b)} + return &response, nil + case ct == "text/plain": + reader := resp.Body + b, err := io.ReadAll(reader) + if err != nil { + return res, err + } + + response := GetFileOKTextPlain{Data: bytes.NewReader(b)} + return &response, nil + default: + return res, validate.InvalidContentType(ct) + } + case 404: + // Code 404. + return &GetFileNotFound{}, nil + } + // Convenient error response. + defRes, err := func() (res *ErrorStatusCode, err error) { + ct, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) + if err != nil { + return res, errors.Wrap(err, "parse media type") + } + switch { + case ct == "application/json": + buf, err := io.ReadAll(resp.Body) + if err != nil { + return res, err + } + d := jx.DecodeBytes(buf) + + var response Error + if err := func() error { + if err := response.Decode(d); err != nil { + return err + } + if err := d.Skip(); err != io.EOF { + return errors.New("unexpected trailing data") + } + return nil + }(); err != nil { + err = &ogenerrors.DecodeBodyError{ + ContentType: ct, + Body: buf, + Err: err, + } + return res, err + } + return &ErrorStatusCode{ + StatusCode: resp.StatusCode, + Response: response, + }, nil + default: + return res, validate.InvalidContentType(ct) + } + }() + if err != nil { + return res, errors.Wrap(err, "default") + } + return res, errors.Wrap(defRes, "error") +} + func decodeGetPageResponse(resp *http.Response) (res GetPageRes, err error) { switch resp.StatusCode { case 200: diff --git a/api/openapi/oas_response_encoders_gen.go b/api/openapi/oas_response_encoders_gen.go index 54265cd..b0ecbaa 100644 --- a/api/openapi/oas_response_encoders_gen.go +++ b/api/openapi/oas_response_encoders_gen.go @@ -3,6 +3,7 @@ package openapi import ( + "io" "net/http" "github.com/go-faster/errors" @@ -24,6 +25,52 @@ func encodeAddPageResponse(response *Page, w http.ResponseWriter, span trace.Spa return nil } +func encodeGetFileResponse(response GetFileRes, w http.ResponseWriter, span trace.Span) error { + switch response := response.(type) { + case *GetFileOKApplicationPdf: + w.Header().Set("Content-Type", "application/pdf") + w.WriteHeader(200) + span.SetStatus(codes.Ok, http.StatusText(200)) + + writer := w + if _, err := io.Copy(writer, response); err != nil { + return errors.Wrap(err, "write") + } + return nil + + case *GetFileOKTextHTML: + w.Header().Set("Content-Type", "text/html") + w.WriteHeader(200) + span.SetStatus(codes.Ok, http.StatusText(200)) + + writer := w + if _, err := io.Copy(writer, response); err != nil { + return errors.Wrap(err, "write") + } + return nil + + case *GetFileOKTextPlain: + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(200) + span.SetStatus(codes.Ok, http.StatusText(200)) + + writer := w + if _, err := io.Copy(writer, response); err != nil { + return errors.Wrap(err, "write") + } + return nil + + case *GetFileNotFound: + w.WriteHeader(404) + span.SetStatus(codes.Error, http.StatusText(404)) + + return nil + + default: + return errors.Errorf("unexpected response type: %T", response) + } +} + func encodeGetPageResponse(response GetPageRes, w http.ResponseWriter, span trace.Span) error { switch response := response.(type) { case *PageWithResults: diff --git a/api/openapi/oas_router_gen.go b/api/openapi/oas_router_gen.go index 04d5d8c..b741255 100644 --- a/api/openapi/oas_router_gen.go +++ b/api/openapi/oas_router_gen.go @@ -35,7 +35,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.notFound(w, r) return } - args := [1]string{} + args := [2]string{} // Static code generated router with unwrapped path search. switch { @@ -72,12 +72,15 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // Param: "id" - // Leaf parameter - args[0] = elem - elem = "" + // Match until "/" + idx := strings.IndexByte(elem, '/') + if idx < 0 { + idx = len(elem) + } + args[0] = elem[:idx] + elem = elem[idx:] if len(elem) == 0 { - // Leaf node. switch r.Method { case "GET": s.handleGetPageRequest([1]string{ @@ -89,6 +92,34 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + switch elem[0] { + case '/': // Prefix: "/file/" + if l := len("/file/"); len(elem) >= l && elem[0:l] == "/file/" { + elem = elem[l:] + } else { + break + } + + // Param: "file_id" + // Leaf parameter + args[1] = elem + elem = "" + + if len(elem) == 0 { + // Leaf node. + switch r.Method { + case "GET": + s.handleGetFileRequest([2]string{ + args[0], + args[1], + }, elemIsEscaped, w, r) + default: + s.notAllowed(w, r, "GET") + } + + return + } + } } } } @@ -101,7 +132,7 @@ type Route struct { operationID string pathPattern string count int - args [1]string + args [2]string } // Name returns ogen operation name. @@ -195,14 +226,17 @@ func (s *Server) FindPath(method string, u *url.URL) (r Route, _ bool) { } // Param: "id" - // Leaf parameter - args[0] = elem - elem = "" + // Match until "/" + idx := strings.IndexByte(elem, '/') + if idx < 0 { + idx = len(elem) + } + args[0] = elem[:idx] + elem = elem[idx:] if len(elem) == 0 { switch method { case "GET": - // Leaf: GetPage r.name = "GetPage" r.operationID = "getPage" r.pathPattern = "/pages/{id}" @@ -213,6 +247,34 @@ func (s *Server) FindPath(method string, u *url.URL) (r Route, _ bool) { return } } + switch elem[0] { + case '/': // Prefix: "/file/" + if l := len("/file/"); len(elem) >= l && elem[0:l] == "/file/" { + elem = elem[l:] + } else { + break + } + + // Param: "file_id" + // Leaf parameter + args[1] = elem + elem = "" + + if len(elem) == 0 { + switch method { + case "GET": + // Leaf: GetFile + r.name = "GetFile" + r.operationID = "getFile" + r.pathPattern = "/pages/{id}/file/{file_id}" + r.args = args + r.count = 2 + return r, true + default: + return + } + } + } } } } diff --git a/api/openapi/oas_schemas_gen.go b/api/openapi/oas_schemas_gen.go index 2cda604..886462b 100644 --- a/api/openapi/oas_schemas_gen.go +++ b/api/openapi/oas_schemas_gen.go @@ -4,6 +4,7 @@ package openapi import ( "fmt" + "io" "time" "github.com/go-faster/errors" @@ -148,6 +149,50 @@ func (s *Format) UnmarshalText(data []byte) error { } } +// GetFileNotFound is response for GetFile operation. +type GetFileNotFound struct{} + +func (*GetFileNotFound) getFileRes() {} + +type GetFileOKApplicationPdf struct { + Data io.Reader +} + +// Read reads data from the Data reader. +// +// Kept to satisfy the io.Reader interface. +func (s GetFileOKApplicationPdf) Read(p []byte) (n int, err error) { + return s.Data.Read(p) +} + +func (*GetFileOKApplicationPdf) getFileRes() {} + +type GetFileOKTextHTML struct { + Data io.Reader +} + +// Read reads data from the Data reader. +// +// Kept to satisfy the io.Reader interface. +func (s GetFileOKTextHTML) Read(p []byte) (n int, err error) { + return s.Data.Read(p) +} + +func (*GetFileOKTextHTML) getFileRes() {} + +type GetFileOKTextPlain struct { + Data io.Reader +} + +// Read reads data from the Data reader. +// +// Kept to satisfy the io.Reader interface. +func (s GetFileOKTextPlain) Read(p []byte) (n int, err error) { + return s.Data.Read(p) +} + +func (*GetFileOKTextPlain) getFileRes() {} + // GetPageNotFound is response for GetPage operation. type GetPageNotFound struct{} diff --git a/api/openapi/oas_server_gen.go b/api/openapi/oas_server_gen.go index 646ed6f..add7ea3 100644 --- a/api/openapi/oas_server_gen.go +++ b/api/openapi/oas_server_gen.go @@ -14,6 +14,12 @@ type Handler interface { // // POST /pages AddPage(ctx context.Context, req OptAddPageReq) (*Page, error) + // GetFile implements getFile operation. + // + // Get file content. + // + // GET /pages/{id}/file/{file_id} + GetFile(ctx context.Context, params GetFileParams) (GetFileRes, error) // GetPage implements getPage operation. // // Get page details. diff --git a/api/openapi/oas_unimplemented_gen.go b/api/openapi/oas_unimplemented_gen.go index 03c52e6..934ecd8 100644 --- a/api/openapi/oas_unimplemented_gen.go +++ b/api/openapi/oas_unimplemented_gen.go @@ -22,6 +22,15 @@ func (UnimplementedHandler) AddPage(ctx context.Context, req OptAddPageReq) (r * return r, ht.ErrNotImplemented } +// GetFile implements getFile operation. +// +// Get file content. +// +// GET /pages/{id}/file/{file_id} +func (UnimplementedHandler) GetFile(ctx context.Context, params GetFileParams) (r GetFileRes, _ error) { + return r, ht.ErrNotImplemented +} + // GetPage implements getPage operation. // // Get page details. diff --git a/application/application.go b/application/application.go index 27d51f4..0e0ad92 100644 --- a/application/application.go +++ b/application/application.go @@ -33,8 +33,7 @@ func NewApplication(cfg Config) (Application, error) { return Application{}, fmt.Errorf("new badger: %w", err) } - fileRepo := badgerRepo.NewFile(db) - pageRepo, err := badgerRepo.NewPage(db, fileRepo) + pageRepo, err := badgerRepo.NewPage(db) if err != nil { return Application{}, fmt.Errorf("new page repo: %w", err) } @@ -44,8 +43,11 @@ func NewApplication(cfg Config) (Application, error) { return Application{}, fmt.Errorf("new processors: %w", err) } + workerCh := make(chan *entity.Page) + worker := entity.NewWorker(workerCh, pageRepo, processor, log.Named("worker")) + server, err := openapi.NewServer( - rest.NewService(pageRepo), + rest.NewService(pageRepo, workerCh), openapi.WithMiddleware( func(r middleware.Request, next middleware.Next) (middleware.Response, error) { start := time.Now() @@ -85,22 +87,21 @@ func NewApplication(cfg Config) (Application, error) { db: db, processor: processor, httpServer: &httpServer, + worker: worker, pageRepo: pageRepo, - fileRepo: fileRepo, }, nil } type Application struct { - cfg Config - log *zap.Logger - db *badger.DB - processor entity.Processor - + cfg Config + log *zap.Logger + db *badger.DB + processor entity.Processor httpServer *http.Server + worker *entity.Worker pageRepo *badgerRepo.Page - fileRepo *badgerRepo.File } func (a *Application) Log() *zap.Logger { @@ -108,12 +109,14 @@ func (a *Application) Log() *zap.Logger { } func (a *Application) Start(ctx context.Context, wg *sync.WaitGroup) error { - wg.Add(2) + wg.Add(3) a.httpServer.BaseContext = func(net.Listener) context.Context { return ctx } + go a.worker.Start(ctx, wg) + go func() { defer wg.Done() diff --git a/entity/page.go b/entity/page.go index fc86cad..508d143 100644 --- a/entity/page.go +++ b/entity/page.go @@ -57,9 +57,7 @@ func (p *Page) SetProcessing() { p.Status = StatusProcessing } -func (p *Page) Process(ctx context.Context, wg *sync.WaitGroup, processor Processor) { - defer wg.Done() - +func (p *Page) Process(ctx context.Context, processor Processor) { innerWG := sync.WaitGroup{} innerWG.Add(len(p.Formats)) @@ -78,6 +76,8 @@ func (p *Page) Process(ctx context.Context, wg *sync.WaitGroup, processor Proces }(format) } + innerWG.Wait() + var hasResultWithOutErrors bool for _, result := range p.Results.Results() { if result.Err != nil { @@ -94,6 +94,4 @@ func (p *Page) Process(ctx context.Context, wg *sync.WaitGroup, processor Proces if p.Status == StatusProcessing { p.Status = StatusDone } - - innerWG.Wait() } diff --git a/entity/result.go b/entity/result.go index f2bb9f9..b7a2e15 100644 --- a/entity/result.go +++ b/entity/result.go @@ -22,12 +22,14 @@ func (r *Results) MarshalMsgpack() ([]byte, error) { } func (r *Results) UnmarshalMsgpack(b []byte) error { - return msgpack.Unmarshal(b, r.results) + return msgpack.Unmarshal(b, &r.results) } func (r *Results) Add(result Result) { r.mu.Lock() - r.results = append(r.results, result) + results := r.results + results = append(results, result) + r.results = results r.mu.Unlock() } diff --git a/entity/worker.go b/entity/worker.go new file mode 100644 index 0000000..8630aae --- /dev/null +++ b/entity/worker.go @@ -0,0 +1,66 @@ +package entity + +import ( + "context" + "sync" + + "go.uber.org/zap" +) + +type Pages interface { + Save(ctx context.Context, page *Page) error +} + +func NewWorker(ch chan *Page, pages Pages, processor Processor, log *zap.Logger) *Worker { + return &Worker{pages: pages, processor: processor, log: log, ch: ch} +} + +type Worker struct { + ch chan *Page + pages Pages + processor Processor + log *zap.Logger +} + +func (w *Worker) Start(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + + w.log.Info("starting") + + for { + select { + case <-ctx.Done(): + return + + case page, open := <-w.ch: + if !open { + w.log.Warn("channel closed") + return + } + + log := w.log.With(zap.Stringer("page_id", page.ID), zap.String("page_url", page.URL)) + + log.Info("got new page") + + wg.Add(1) + go w.do(ctx, wg, page, log) + } + } +} + +func (w *Worker) do(ctx context.Context, wg *sync.WaitGroup, page *Page, log *zap.Logger) { + defer wg.Done() + + page.Process(ctx, w.processor) + + log.Debug("page processed") + + if err := w.pages.Save(ctx, page); err != nil { + w.log.Error( + "failed to save processed page", + zap.String("page_id", page.ID.String()), + zap.String("page_url", page.URL), + zap.Error(err), + ) + } +} diff --git a/ports/rest/converter.go b/ports/rest/converter.go index e2252b6..edfd47f 100644 --- a/ports/rest/converter.go +++ b/ports/rest/converter.go @@ -24,18 +24,23 @@ func PageToRestWithResults(page *entity.Page) openapi.PageWithResults { results := make([]openapi.Result, len(page.Results.Results())) for i := range results { - result := &page.Results.Results()[i] + result := &(page.Results.Results())[i] + + errText := openapi.OptString{} + if result.Err != nil { + errText = openapi.NewOptString(result.Err.Error()) + } results[i] = openapi.Result{ Format: FormatToRest(result.Format), - Error: openapi.NewOptString(result.Err.Error()), + Error: errText, Files: func() []openapi.ResultFilesItem { - files := make([]openapi.ResultFilesItem, len(results[i].Files)) + files := make([]openapi.ResultFilesItem, len(result.Files)) for j := range files { file := &result.Files[j] - files[i] = openapi.ResultFilesItem{ + files[j] = openapi.ResultFilesItem{ ID: file.ID, Name: file.Name, Mimetype: file.MimeType, diff --git a/ports/rest/service.go b/ports/rest/service.go index c45f518..fe0a595 100644 --- a/ports/rest/service.go +++ b/ports/rest/service.go @@ -1,9 +1,11 @@ package rest import ( + "bytes" "context" "fmt" "net/http" + "strings" "github.com/google/uuid" @@ -14,16 +16,18 @@ import ( type Pages interface { ListAll(ctx context.Context) ([]*entity.Page, error) Save(ctx context.Context, site *entity.Page) error - Get(_ context.Context, id uuid.UUID) (*entity.Page, error) + Get(ctx context.Context, id uuid.UUID) (*entity.Page, error) + GetFile(ctx context.Context, pageID, fileID uuid.UUID) (*entity.File, error) } -func NewService(sites Pages) *Service { - return &Service{pages: sites} +func NewService(sites Pages, ch chan *entity.Page) *Service { + return &Service{pages: sites, ch: ch} } type Service struct { openapi.UnimplementedHandler pages Pages + ch chan *entity.Page } func (s *Service) GetPage(ctx context.Context, params openapi.GetPageParams) (openapi.GetPageRes, error) { @@ -38,14 +42,18 @@ func (s *Service) GetPage(ctx context.Context, params openapi.GetPageParams) (op } func (s *Service) AddPage(ctx context.Context, req openapi.OptAddPageReq) (*openapi.Page, error) { - site := entity.NewPage(req.Value.URL, req.Value.Description.Value, FormatFromRest(req.Value.Formats)...) + page := entity.NewPage(req.Value.URL, req.Value.Description.Value, FormatFromRest(req.Value.Formats)...) - err := s.pages.Save(ctx, site) + page.Status = entity.StatusProcessing + + err := s.pages.Save(ctx, page) if err != nil { - return nil, fmt.Errorf("save site: %w", err) + return nil, fmt.Errorf("save page: %w", err) } - res := PageToRest(site) + s.ch <- page + + res := PageToRest(page) return &res, nil } @@ -64,6 +72,27 @@ func (s *Service) GetPages(ctx context.Context) (openapi.Pages, error) { return res, nil } +func (s *Service) GetFile(ctx context.Context, params openapi.GetFileParams) (openapi.GetFileRes, error) { + file, err := s.pages.GetFile(ctx, params.ID, params.FileID) + if err != nil { + return &openapi.GetFileNotFound{}, nil + } + + switch { + case file.MimeType == "application/pdf": + return &openapi.GetFileOKApplicationPdf{Data: bytes.NewReader(file.Data)}, nil + + case strings.HasPrefix(file.MimeType, "text/plain"): + return &openapi.GetFileOKTextPlain{Data: bytes.NewReader(file.Data)}, nil + + case strings.HasPrefix(file.MimeType, "text/html"): + return &openapi.GetFileOKTextHTML{Data: bytes.NewReader(file.Data)}, nil + + default: + return nil, fmt.Errorf("unsupported mimetype: %s", file.MimeType) + } +} + func (s *Service) NewError(_ context.Context, err error) *openapi.ErrorStatusCode { return &openapi.ErrorStatusCode{ StatusCode: http.StatusInternalServerError,