Introduction
In the first part of our tutorial, we established the groundwork for our Google Drive downloader in Go, setting up the project and integrating Google OAuth for secure access to files. Now, in Part 2, we’ll dive deeper into the core functionality of our downloader like handling multiple concurrent downloads while updating progress in real-time, cancellations, etc.
If you haven't read the previous part where it all began, please do so first to grasp the entire logic correctly.
Creating Downloader Store
Handling multiple concurrent downloads while updating progress for each one can be complex. We need to track the progress of each file download individually, manage errors effectively, and ensure that cancellations are handled gracefully
In this section, we'll implement a Downloader store that manages multiple concurrent downloads and updates their progress in real time.
// Better to keep this in a seperate place
type Progress struct {
FileID string `json:"file_id"`
Total int64 `json:"total"`
Current int `json:"current"`
Complete bool `json:"complete"`
ReadableSize string `json:"readableSize"`
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime"`
Speed float64 `json:"speed"`
}
type Downloader struct {
progressChans map[string]chan Progress
ErrChans map[string]chan error
FileIds []string
DestinationPath string
PendingDownloads map[string]Progress
cancelFuncs map[string]context.CancelFunc
pendingDownloadsMu sync.RWMutex
}
func NewDownloader(fileIds []string, destinationPath string) *Downloader {
return &Downloader{
progressChans: make(map[string]chan Progress),
ErrChans: make(map[string]chan error),
FileIds: fileIds,
DestinationPath: destinationPath,
PendingDownloads: make(map[string]Progress),
cancelFuncs: make(map[string]context.CancelFunc),
}
}progressChans: Tracks the progress of each download, with each file having its own channel for reporting download progress.ErrChans: Collects errors for each download, allowing errors to be handled per file without disrupting other downloads.FileIds: Contains the list of file IDs that need to be downloaded.DestinationPath: Specifies where the downloaded files will be stored.PendingDownloads: Holds the current progress of active downloads, allowing you to query the status at any time.cancelFuncs: Stores cancel functions for each download, enabling cancellation of downloads on demand.pendingDownloadsMu: Ensures thread-safe access to thePendingDownloadsmap, preventing race conditions when multiple go routines reads or updates progress.
Implementing Core Functions for Download Management
Let's start off with the easy ones which will be used in the main StartDownload function.
GetPendingDownloads
func (d *Downloader) GetPendingDownloads() ([]Progress, error) {
d.pendingDownloadsMu.Lock()
defer d.pendingDownloadsMu.Unlock()
if d.PendingDownloads == nil {
return nil, fmt.Errorf("No ongoing downloads")
}
var pendingsDownloads []Progress
for _, prog := range d.PendingDownloads {
pendingsDownloads = append(pendingsDownloads, prog)
}
return pendingsDownloads, nil
}This function locks the PendingDownloads map, retrieves all currently ongoing downloads, and returns their progress as a slice of Progress objects. If there are no downloads, it returns an error.
GetProgress
func (d *Downloader) GetProgress(fileID string) (Progress, error) {
d.pendingDownloadsMu.Lock()
defer d.pendingDownloadsMu.Unlock()
if d.PendingDownloads == nil {
return nil, fmt.Errorf("No ongoing download")
}
prog := d.PendingDownloads[fileID]
return prog, nil
}It locks the PendingDownloads map and fetches the progress for a specific download by its fileID. If no progress is found, it returns an error.
SetProgress
func (d *Downloader) SetProgress(fileID string, prog Progress) {
d.pendingDownloadsMu.Lock()
defer d.pendingDownloadsMu.Unlock()
if existingProg, ok := d.PendingDownloads[fileID]; ok {
existingProg.Complete = prog.Complete
existingProg.Current = prog.Current
existingProg.EndTime = prog.EndTime
} else {
d.PendingDownloads[fileID] = prog
}
}This function updates the progress of an ongoing download or adds a new one. It locks the map, checks if the file exists, and either updates the current progress or sets new progress.
DeleteProgress
func (d *Downloader) DeleteProgress(fileID string) {
d.pendingDownloadsMu.Lock()
defer d.pendingDownloadsMu.Unlock()
delete(d.PendingDownloads, fileID)
}It safely removes the progress information for a specific file by locking the PendingDownloads map and deleting the corresponding entry.
CancelDownload
func (d *Downloader) CancelDownload(fileID string) error {
cancel, ok := d.cancelFuncs[fileID]
if !ok {
return fmt.Errorf("no downloads found to cancel")
}
cancel()
return nil
}Cancels a specific download by calling the stored cancel function for that file. If no cancel function exists, it returns an error.
CancelAllDownloads
func (d *Downloader) CancelAllDownloads() {
for _, cancel := range d.cancelFuncs {
cancel()
}
}Loops through all active downloads and cancels each one by invoking their respective cancel functions.
CleanUp
func (d *Downloader) cleanUp(fileID string) {
close(d.progressChans[fileID])
close(d.ErrChans[fileID])
delete(d.progressChans, fileID)
delete(d.ErrChans, fileID)
d.pendingDownloadsMu.Lock()
delete(d.PendingDownloads, fileID)
d.pendingDownloadsMu.Unlock()
}After a download completes, this function closes the progress and error channels for the file and removes its tracking from the maps to free resources.
StartDownload
This is the main logic for starting the downloads in batch, updating the progress for each and handling any errors in between.
-
Iterationg over
fileIds- The method loops over
d.FileIds, which contains a list of file IDs to be downloaded. For each file, it prepares aprogChanfor progress updates anderrChanfor error reporting and stores them in maps (progressChansandErrChans), keyed by the file ID.
for _, fileID := range d.FileIds { progChan := make(chan Progress) d.progressChans[fileID] = progChan errChan := make(chan error) d.ErrChans[fileID] = errChan } - The method loops over
-
Cancellation Handling
- A
context.WithCancel(ctx)is created for each file's download. This allows each download to be canceled individually, and the cancel functions are stored ind.cancelFuncs.
downloadCtx, cancel := context.WithCancel(ctx) d.cancelFuncs[fileID] = cancel - A
-
Starting downloads in dedicated Goroutines
-
The download for each file is initiated in a separate goroutine, ensuring that all downloads run concurrently.
-
The
service.GDriveDownloaderfunction is called with the necessary configuration (file ID, destination path, file name, access token) and theprogChanfor progress updates. -
If an error occurs during the download, it's sent to the
errChan, andd.cleanUp(fileID)is called to remove any related state for that file from the downloader.
go func(fileID string) { // Don't worry, we'll create the downloader service later. err := service.GDriveDownloader(service.DownloaderConfig{ FileID: fileID, DestinationPath: d.DestinationPath, FileName: fileName, AccessToken: accToken, }, progChan, downloadCtx) if err != nil { log.Errorf("error downloading file %s: %v\n", fileID, err) errChan <- err } d.cleanUp(fileID) }(fileID) -
-
Monitoring Progress in a Separate Goroutine
-
For each file, another goroutine is started to listen to progress updates sent on
progChan. -
The progress is continuously read from the channel and stored using
d.SetProgress(fileID, prog)for tracking the file’s download progress.
for fileID, progChan := range d.progressChans { go func(fileID string, progChan chan Progress) { for prog := range progChan { d.SetProgress(fileID, prog) } }(fileID, progChan) } -
Full implementation
func (d *Downloader) StartDownload(ctx context.Context, accToken string, fileName string) error {
for _, fileID := range d.FileIds {
progChan := make(chan Progress)
d.progressChans[fileID] = progChan
errChan := make(chan error)
d.ErrChans[fileID] = errChan
downloadCtx, cancel := context.WithCancel(ctx)
d.cancelFuncs[fileID] = cancel
// Start multiple downloads in dedicated go routines
go func(fileID string) {
// Don't worry, we'll create the downloader service later.
err := service.GDriveDownloader(service.DownloaderConfig{
FileID: fileID,
DestinationPath: d.DestinationPath,
FileName: fileName,
AccessToken: accToken,
}, progChan, downloadCtx)
if err != nil {
log.Errorf("error downloading file %s: %v\n", fileID, err)
errChan <- err
}
d.cleanUp(fileID)
}(fileID)
}
for fileID, progChan := range d.progressChans {
go func(fileID string, progChan chan Progress) {
for prog := range progChan {
d.SetProgress(fileID, prog)
}
}(fileID, progChan)
}
return nil
}Working on Download Service
type DownloaderConfig struct {
FileID string
DestinationPath string
FileName string
AccessToken string
}
// GDriveDownloader downloads a file from Google Drive and tracks progress.
// It uses a 32KB buffer to read the file in chunks, allowing for efficient streaming.
func GDriveDownloader(cfg DownloaderConfig, progChan chan<- Progress, ctx context.Context) error {
// Initialization
// Refer to https://github.com/nilotpaul/go-downloader/blob/main/util/downloader_util.go
srv, err := util.MakeGDriveService(ctx, cfg.AccessToken)
if err != nil {
return fmt.Errorf("failed to initialize GDrive service: %v", err)
}
file, err := srv.Files.Get(cfg.FileID).Do()
if err != nil {
return fmt.Errorf("failed to get file info: %v", err)
}
if len(cfg.FileName) == 0 {
cfg.FileName = file.OriginalFilename
}
destFile, err := util.CreateFile(cfg.FileName)
if err != nil {
return fmt.Errorf("failed to create destination file %s: %v", cfg.FileName, err)
}
defer destFile.Close()
// File Download
res, err := srv.Files.Get(cfg.FileID).Download()
if err != nil {
return fmt.Errorf("failed to download the file: %v", err)
}
defer res.Body.Close()
// Streaming Download with buffer
buf := make([]byte, 32*1024) // 32KB buffer
var totalWritten int64
// Progress Tracking
prog := &types.Progress{
FileID: cfg.FileID,
Total: file.FileSize,
ReadableSize: util.FormatBytes(file.FileSize),
StartTime: time.Now(),
}
// Send initial progress
progChan <- prog
for {
n, err := res.Body.Read(buf)
select {
// Cancellation Handling
case <-ctx.Done():
log.Infof("download cancelled for %s", cfg.FileID)
return nil
default:
if n > 0 {
written, writeErr := destFile.Write(buf[0:n])
if writeErr != nil {
return fmt.Errorf("failed to write file content: %v", writeErr)
}
totalWritten += int64(written)
// Update progress
prog.Current = int(float64(totalWritten) / float64(file.FileSize) * 100)
elapsedTime := time.Since(prog.StartTime).Seconds()
if elapsedTime > 0 {
speed := ((float64(totalWritten) / elapsedTime) / 1e6) // Speed in Mbps
prog.Speed = math.Round(speed*100) / 100 // Rounded to two decimal places
}
progChan <- prog
}
}
if err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("failed to read response body of the file %s: %v", file.OriginalFilename, err)
}
}
// Mark download as complete
prog.Complete = true
prog.EndTime = time.Now()
progChan <- prog
return nil
}Explanation
I know this is huge chunk of code to take in but i'll explain every bit of it.
Initialization
The function starts by creating a Google Drive service using the provided access token. It retrieves file information based on the FileID and determines the file name to use for the download. If the FileName is empty, it defaults to the original filename from Google Drive.
File Download
The function calls the Google Drive API to download the file by the fileID retrived from download link, returning an error if the download fails. A defer statement ensures the response body is closed once the function exits.
Please check Google APIs Client library for Go.
Streaming Download with Buffer
A 32KB buffer is defined to read the file in chunks. This allows for efficient streaming, minimizing memory usage since it only holds a small portion of the file at any given time. The totalWritten variable tracks the total number of bytes written to the destination file.
Progress Tracking
An initial progress structure is sent to the channel. The function enters a loop where it continuously reads from the response body. For each chunk read, it writes the data to the destination file, updates the total written bytes, and calculates the current download progress percentage and speed (in Mbps). The progress is sent to the channel after each write operation.
Cancellation Handling
A select statement allows for checking if the context has been canceled, enabling graceful cancellation of the download.
Completion
Once the download is complete (indicated by io.EOF), the function updates the progress to mark the download as complete, sets the end time, and sends the final progress to the channel.
Using Current Logic in HTTP Handlers
Now that everything is set and ready to use. Let's build a HTTP handler which will handle the request for downloading.
Creating DownloadHandler
type DownloadHandler struct {
registry *store.ProviderRegistry
downloader *store.Downloader
env config.EnvConfig
}
func NewDownloadHandler(registry *store.ProviderRegistry, sessStore *session.Store, env config.EnvConfig) *DownloadHandler {
return &DownloadHandler{
registry: registry, // Our provider registry we created at the beginning.
env: env // Env variables,
downloader: store.NewDownloader(make([]string, 0), "") // Our downloader store,
}
}func (h *DownloadHandler) DownloadHandler(c *fiber.Ctx) error {
// Get the input body. Refer to https://github.com/nilotpaul/go-downloader/blob/main/util/downloader_util.go
b, err := util.ValidateDownloadHRBody(c)
if err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Invalid input",
})
}
// Set default destination path if none provided
if len(b.DestinationPath) == 0 {
b.DestinationPath = h.env.DefaultDownloadPath
}
// Extract file IDs from the provided links
// Implement this yourself, you need to get the fileId from each GDrive Link.
// Refer to https://github.com/nilotpaul/go-downloader/blob/main/util/downloader_util.go
fileIDs := util.GetFileIDs(b.Links)
// Retrieve access token for the current user
gp, err := h.registry.GetProvider("google")
if err != nil {
return c.Status(fiber.StatusNotFound).JSON(fiber.Map{
"error": "No provider found",
})
}
// Get access token of the current loggedIn account.
t := gp.GetAccessToken()
h.downloader.FileIds = fileIDs["file"]
h.downloader.DestinationPath = b.DestinationPath
slog.Info("downloading", "GDrive fileIDs: ", fileIDs)
// Start the download
if err := h.downloader.StartDownload(c.Context(), t, ""); err != nil {
return err
}
// Respond with the status and file IDs
return c.JSON(fiber.Map{
"status": http.StatusOK,
"file_ids": fileIDs["file"],
})
}Here, we use the provider registry we created earlier, which allows for multiple providers, even though we’ve implemented just Google for now. By calling .GetProvider("google"), we access all the necessary logic for handling Google-specific tasks. This includes retrieving the access token for the currently logged-in account. We then pass this token to the downloader store along with the file IDs and destination path.
That’s it👍, this process is simple and efficient, making it easy to manage downloads.
Download Progress Handler
func (h *DownloadHandler) ProgressHTTPHandler(c *fiber.Ctx) error {
pendings, _ := h.downloader.GetPendingDownloads()
if len(pendings) == 0 {
return c.Status(fiber.StatusNotFound).JSON(fiber.Map{
"error": "No pending downloads",
})
}
return c.JSON(pendings)
}This handler checks for pending downloads in the pendingDownloads map and sends them to the client. If there are no pending downloads, it returns a 404 Not Found status.
Cancel Download Handler
func (h *DownloadHandler) CancelDownloadHandler(c *fiber.Ctx) error {
// Check if there is ongoing progress for the specified fileID
if _, err := h.downloader.GetProgress(fileID); err != nil {
return c.Status(fiber.StatusNotFound).JSON(fiber.Map{
"error": "No ongoing downloads",
})
}
if err := h.downloader.CancelDownload(fileID); err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": fmt.Sprintf("Failed to cancel the download for file %s", fileID),
})
}
return c.JSON(fiber.Map{"status": "OK"})
}This handler is pretty simple, we check if for ongoing progress for a fileID, if it returns an error we're sure there aren't any pending downloads. Otherwise we call the CancelDownload passing it the fileID.
For Cancelling all downloads at once
func (h *DownloadHandler) CancelAllDownloadsHandler(c *fiber.Ctx) error {
h.downloader.CancelAllDownloads()
return c.JSON("OK")
}Registering Handlers in route.go
func (h *Router) RegisterRoutes(r fiber.Router) {
r.Get("/healthcheck", func(c *fiber.Ctx) error {
return c.JSON("OK")
})
// Google OAuth endpoints
// Implement this yourself or refer to https://github.com/nilotpaul/go-downloader/blob/main/api/route.go
// Download endpoints
r.Post("/download", sessionMW.SessionMiddleware, sessionMW.WithGoogleOAuth, downloadHR.DownloadHandler)
r.Post("/cancel", sessionMW.SessionMiddleware, sessionMW.WithGoogleOAuth, downloadHR.CancelDownloadHandler)
r.Post("/cancelAll", sessionMW.SessionMiddleware, sessionMW.WithGoogleOAuth, downloadHR.CancelAllDownloadsHandler)
r.Get("/progress", sessionMW.SessionMiddleware, downloadHR.ProgressHTTPHandler)
}Wrapping Up
That's it from me but not from the Go Downloader. While this article covered the essential parts and the main logic behind the downloader, you can explore additional features that are already implemented, such as:
- A client that automatically reconnects during progress streaming after a page refresh
- A full folder tree for selecting download destinations
- The ability to download entire folders at once
- Easy self-hosting with Docker installation
Feel free to check out the full functionality and try it out for yourself in my GitHub. If you get stuck, don’t forget to check out the previous article (part 1) for additional context. Happy downloading!
