使用 Go 并发功能将联系人从 CSV 迁移到数据库

用例
假设您有一个包含大量联系人信息的 CSV 文件,需要将其迁移到数据库。这可能是用于客户管理系统、电子邮件营销平台或任何其他必须有效存储和管理联系人详细信息的应用程序。我们可以利用 Go 中的并发来加快这一过程,而不是逐个处理每个联系人,因为这可能很慢且效率低下。并发让我们可以同时处理多个联系人,从而使迁移过程更快。

什么是并发?
并发性是指程序同时执行多项任务的能力。它并不一定意味着同时执行多项任务(即并行性),而是允许任务在重叠时间段内执行。并发性在需要多任务处理的应用程序中特别有用,例如处理多个客户端请求的 Web 服务器或数据处理管道。
并发工作原理:详细说明

并发允许系统通过交错方式同时处理多个任务,使任务看起来像是同时发生的。让我们深入研究实现并发的机制,并用现实生活中的例子来说明它们。

1. 线程
线程是操作系统可以调度的最小处理单元。它们在进程内共享相同的内存空间,这允许线程之间快速通信,但也需要仔细管理以避免竞争条件等问题。

  • 示例:假设一个 Web 服务器需要同时处理多个客户端请求。每个请求由单独的线程处理。这允许服务器同时处理多个请求,从而提高响应时间和总体吞吐量。
  • 现实生活中的类比:想象一个餐厅厨房,其中多个厨师(线程)同时准备不同的菜肴。他们共享相同的厨房空间(内存),每个厨师的工作顺序(任务)不同,但他们需要协调以避免相互碰撞(竞争条件)。

2. 流程
进程是独立执行的程序,在单独的内存空间中运行。每个进程都有自己的内存和资源,这意味着它们不会互相干扰。但是,进程间通信通常比同一进程内线程之间的通信慢。

  • 示例:想象一下,在您的计算机上同时运行文字处理器和 Web 浏览器。每个应用程序都作为单独的进程运行。它们独立运行,如果其中一个崩溃,不会影响另一个。
  • 现实生活中的类比:想象一下两家独立的公司在从事不同的项目。每家公司都在自己的建筑(内存空间)中运营,并拥有自己的资源。如果一家公司遇到问题,不会直接影响另一家公司。

3.事件循环
事件循环是一种编程结构,用于在程序中等待并分派事件或消息。它们在循环中处理多个事件或任务,通常用于异步编程模型。

  • 示例:浏览器中运行的基于 JavaScript 的 Web 应用程序使用事件循环来处理用户交互,如点击、键盘输入和网络请求。事件循环允许应用程序在事件发生时执行回调函数来保持响应。
  • 现实生活中的类比:假设一家杂货店的收银员每次只接待一位顾客。收银员等待顾客到来,处理他们的交易,然后转向下一位顾客。即使每次只接待一位顾客,收银员也可以在顾客到达时快速切换。
4. 协程
协程是可以在特定时间点暂停执行并稍后恢复的函数。这样其他函数就可以同时运行,从而促进协作式多任务处理。协程会主动放弃控制权,这可以使它们在某些情况下比抢占式调度的线程更高效。
  • 示例async:在 Python 中,协程与和关键字一起使用await来执行异步操作,例如从服务器获取数据而不阻塞主程序流。
  • 现实生活中的类比:想象一位同时处理多项任务的厨师,他同时准备多道菜。这位厨师可能会开始烧水(任务 1),然后在等待水沸腾的同时开始切菜(任务 2)。水沸腾后,厨师会切换回任务 1。厨师会根据需要暂停和恢复任务,确保高效利用时间。

Go 中的并发如何工作?
Go 通过其内置功能:goroutines 和 channels 使并发变得简单而高效。

  • Goroutines:这些是由 Go 运行时管理的轻量级线程。它们允许您并发运行函数。启动 goroutine 非常简单,只需go在函数调用前添加关键字即可。
  • 通道channel:通道为 goroutine 提供了一种相互通信并同步其执行的方法。通道可用于在 goroutine 之间发送和接收值。


现实生活中的例子:管理一家繁忙的餐馆
想象一下,你是一家繁忙餐厅的经理,你需要确保一切顺利高效地运行。以下是 Go 中的并发与经营餐厅的比较:

1、Goroutines 作为 Chef:
在你的餐厅里,厨房里有多个厨师。每个厨师可以同时处理不同的订单。在 Go 中,这些厨师就像 goroutine。你可以同时运行多个 goroutine,每个 goroutine 处理不同的任务,例如处理客户的订单。

2.、订单渠道:
为了高效地管理订单,您可以使用订单单在服务员和厨师之间进行沟通。服务员将顾客的订单写在订单单上并将其放在板上。厨师拿起订单单,准备菜肴,然后在订单准备好时通知服务员。在 Go 中,通道的工作方式类似,允许 goroutine 发送和接收消息,确保它们高效地协同工作。

3.、WaitGroup 作为经理的检查表:
作为经理,您需要保留一份清单,以确保所有订单在班次结束前完成。您将每个订单的一项添加到清单中,并在订单完成后将其勾掉。在 Go 中,sync.WaitGroup用于此目的。等待所有 goroutine 完成任务后再继续执行会很有帮助。

详细场景
让我们用这些并发概念来分析一下餐厅里一个典型的繁忙夜晚:

  • 点菜(启动 Goroutines):多个顾客到达并点菜。服务员(主函数)通过​​将订单写在单子上来开始新订单(启动 goroutine)。每个订单由不同的厨师(goroutines)独立处理,允许同时处理多个订单。
  • 准备食物(并发执行任务):每个厨师拿着订单单(传递给 goroutine 的数据)并开始准备菜肴。他们同时处理不同的订单,高效利用厨房空间和资源。
  • 订单沟通(使用渠道):菜品做好后,厨师将完成的订单放在柜台上并通知服务员(通过渠道发送消息)。服务员拿起订单并将其提供给顾客。这确保订单得到有效处理和交付。
  • 一天结束时的检查(使用 WaitGroup):一天结束时,经理(主函数)通过​​检查清单(使用sync.WaitGroup)确保所有订单都已完成。只有当所有订单都已完成(所有 goroutine 都已完成)时,经理才会关闭餐厅。


详细代码:

package main

import (
 "encoding/csv"
 
"fmt"
 
"log"
 
"os"
 
"regexp"
 
"sync"
)

// Define the struct to hold each row's data
type Contact struct {
 Name   string
 Mobile string
 Email  string
}

type FailedRow struct {
 Name        string
 Mobile      string
 Email       string
 ErrorReason string
 Succeed     bool
}

type FinalResponse struct {
 Status    string      <code>json:
"status"</code>
 FailedRow []FailedRow <code>json:
"failed_row"</code>
}

func main() {
 
// Open the CSV file
 file, err := os.Open(
"contacts.csv")
 if err != nil {
  log.Fatal(err)
 }
 defer file.Close()

 
// Create a new CSV reader
 reader := csv.NewReader(file)

 
// Read all the CSV records
 records, err := reader.ReadAll()
 if err != nil {
  log.Fatal(err)
 }

 
// Slice to hold all the contacts
 var contacts []Contact

 
// Iterate through the records
 for i, record := range records {
 
// Skip the header row (if present)
  if i == 0 {
   continue
  }

 
// Map the record to the Contact struct
  contact := Contact{
   Name:   record[0],
   Mobile: record[1],
   Email:  record[2],
  }

 
// Append the contact to the slice
  contacts = append(contacts, contact)
 }

 maxWorkers := 10
 inputCh := make(chan Contact, len(contacts))
 errorCh := make(chan FailedRow, len(contacts))
 var wg sync.WaitGroup
 for i := 0; i < maxWorkers; i++ {
  go HandleContactMigration(inputCh, errorCh, &wg)
 }
 for i := 0; i < len(contacts); i++ {
  wg.Add(1)
  inputCh <- contacts<i>
 }
 close(inputCh)

 totalError := make([]FailedRow, 0)
 go func() {
  for failedMigration := range errorCh {
   if !failedMigration.Succeed {
    totalError = append(totalError, failedMigration)
   }
   wg.Done()
  }
 }()
 wg.Wait()
 close(errorCh)
 resp := FinalResponse{}
 if len(totalError) == 0 {
  resp.Status =
"SUCCEED"
 } else if len(totalError) == len(contacts) {
  resp.Status =
"FAILED"
  resp.FailedRow = totalError
 } else {
  resp.FailedRow = totalError
  resp.Status =
"PARTIAL_SUCCEED"
 }
 
//return response
 fmt.Println(resp)
}

func HandleContactMigration(rows <-chan Contact, errorCh chan FailedRow, wg *sync.WaitGroup) {
 for row := range rows {
  errorResp := FailedRow{
   Name:    row.Name,
   Mobile:  row.Mobile,
   Email:   row.Email,
   Succeed: false,
// Default false
  }
  err := MigrateSingleRow(row)
  if err != nil {
   fmt.Println(err)
   errorResp.ErrorReason = err.Error()
   errorCh <- errorResp
   continue
  }
  errorResp.Succeed = true
  errorCh <- errorResp
 }
}

func MigrateSingleRow(row Contact) error {
 if !isValidEmail(row.Email) {
  return fmt.Errorf(
"Invalid email")
 }
 if !isValidMobile(row.Mobile) {
  return fmt.Errorf(
"Invalid mobile")
 }
 return SaveToDb(row)
}

func SaveToDb(row Contact) error {
 
// Your code to save contact to your Database and return error if any
 return nil
}

func isValidMobile(phone string) bool {
 if len(phone) != 10 {
  return false
 }
 fmt.Println(string(phone[0]))
 if string(phone[0]) ==
"5" || string(phone[0]) == "6" || string(phone[0]) == "7" || string(phone[0]) == "8" || string(phone[0]) == "9" {
  return true
 }
 return false
}

func isValidEmail(email string) bool {
 
// Define a regular expression for a valid email
 var emailRegex = regexp.MustCompile(<code>^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$</code>)
 return emailRegex.MatchString(email)
}


此 Go 程序从 CSV 文件读取联系人信息,验证数据并并发处理。

让我们逐步分解代码以了解其工作原理。

步骤 1:导入必要的包

import (
    "encoding/csv"
   
"fmt"
   
"log"
   
"os"
   
"regexp"
   
"sync"
)
  • encoding/csv:读取CSV文件。
  • fmt:用于格式化的 I/O 操作。
  • log:用于记录错误。
  • os:用于处理文件操作。
  • regexp:对于正则表达式,用于验证电子邮件。
  • sync:使用 WaitGroups 同步 goroutines。

步骤 2:定义数据结构

type Contact struct {
    Name   string
    Mobile string
    Email  string
}
type FailedRow struct {
    Name        string
    Mobile      string
    Email       string
    ErrorReason string
    Succeed     bool
}
type FinalResponse struct {
    Status    string      <code>json:"status"</code>
    FailedRow []FailedRow <code>json:
"failed_row"</code>
}
  • Contact:保存个人联系方式。
  • FailedRow:包含验证或保存失败的联系人的详细信息。
  • FinalResponse:收集最终状态和失败行的列表以供输出。

步骤3:主要功能

func  main () {
    file, err := os.Open( "contacts.csv" )
     if err != nil {
        log.Fatal(err)
    }
     defer file.Close()

    reader := csv.NewReader(file)
    records, err := reader.ReadAll()
     if err != nil {
        log.Fatal(err)
    }

     var contacts []Contact

     for i, record := range records {
         if i == 0 {
             continue
         }

        contact := Contact{
            Name: record[ 0 ],
            Mobile: record[ 1 ],
            Email: record[ 2 ],
        }

        contacts = append (contacts, contact)
    }
  • 打开contacts.csv文件。
  • 从 CSV 中读取所有记录。
  • 跳过标题行并将每条记录映射到Contact结构。
  • 将每个联系人附加到contacts切片。


步骤 4:设置并发

maxWorkers := 10
     inputCh := make ( chan Contact,len (contacts)) 
    errorCh := make ( chan FailedRow,len (contacts)) 
    var wg sync.WaitGroup 

    for i := 0 ; i < maxWorkers; i++ { 
        go HandleContactMigration(inputCh, errorCh, &wg) 
    } 
    
    for i := 0 ; i < len (contacts); i++ { 
        wg.Add( 1 ) 
        inputCh <- contacts<i> 
    } 
    close (inputCh)
  • maxWorkers:定义工作 goroutine 的数量。您可以根据需要进行设置。
  • inputCh:向工作 goroutine 发送联系人的通道。
  • errorCh:接收迁移失败详情的渠道。
  • WaitGroup:同步 goroutines 以确保所有任务在继续之前都已完成。
  • 启动maxWorkers多个 goroutine 来同时处理联系人迁移。
  • 将每个联系发送到inputCh通道并增加 WaitGroup 计数器。
  • inputCh联系人全部发送完毕后关闭。


步骤 5:收集错误
 

  totalError := make ([]FailedRow,0 ) 
    go  func () { 
        for failedMigration := range errorCh { 
            if !failedMigration.Succeed { 
                totalError = append (totalError,failedMigration) 
            } 
            wg.Done() 
        } 
    }() 
    wg.Wait() 
    close (errorCh)
  • 启动一个 goroutine 来从errorCh通道中收集失败的迁移。
  • 将所有失败的迁移附加到totalError切片。
  • 每次处理错误时减少 WaitGroup 计数器。
  • 等待所有 goroutines 完成使用wg.Wait()。
  • 处理完毕后关闭errorCh通道。

第 6 步:最终回应

 resp := FinalResponse{}
    if len(totalError) == 0 {
        resp.Status = "SUCCEED"
    } else if len(totalError) == len(contacts) {
        resp.Status =
"FAILED"
        resp.FailedRow = totalError
    } else {
        resp.FailedRow = totalError
        resp.Status =
"PARTIAL_SUCCEED"
    }
    fmt.Println(resp)
}

  • 构造FinalResponse。
  • 根据失败的迁移次数设置状态。
  • 打印最终响应。


辅助函数
1.处理联系人迁移

func  HandleContactMigration (rows <- chan Contact, errorCh chan FailedRow, wg *sync.WaitGroup) {
     for row := range rows {
        errorResp := FailedRow{
            Name: row.Name,
            Mobile: row.Mobile,
            Email: row.Email,
            Succeed: false , // 默认 false
         }
        err := MigrateSingleRow(row)
         if err != nil {
            fmt.Println(err)
            errorResp.ErrorReason = err.Error()
            errorCh <- errorResp
             continue
         }
        errorResp.Succeed = true
         errorCh <- errorResp
    }
}
  • 处理来自inputCh渠道的每个联系人。
  • FailedRow使用默认值初始化Succeed为false。
  • 调用MigrateSingleRow来处理验证和保存。
  • 将结果发送至errorCh。
2. MigrateSingleRow

func MigrateSingleRow (row Contact) error { 
    if ! isValidEmail (row.Email) { 
        return fmt .Errorf ("无效电子邮件"
    } 
    if ! isValidMobile (row.Mobile) { 
        return fmt .Errorf (
"无效手机"
    } 
    return SaveToDb (row) 
}
  • 使用辅助功能验证电子邮件和手机。
  • 如果验证失败则返回错误。
  • 拨打电话SaveToDb保存联系人。
3. 保存至数据库

func  SaveToDb (row Contact)  error {
     // 你的代码将联系人保存到你的数据库并返回错误(如果有)
    return  nil
 }

用于保存联系人到数据库的占位符。

4. isValidMobile 和 isValidEmail

func  isValidMobile (phone string )  bool {
     if  len (phone) != 10 {
         return  false
     }
    fmt.Println( string (phone[ 0 ]))
     if  string (phone[ 0 ]) == "5" || string (phone[ 0 ]) == "6" || string (phone[ 0 ]) == "7" || string (phone[ 0 ]) == "8" || string (phone[ 0 ]) == "9" {
         return  true
     }
     return  false
 }
 func  isValidEmail (email string )  bool {
     var emailRegex = regexp.MustCompile( <code>^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$</code> )
     return emailRegex.MatchString(email)
}
  • 检查手机号码是否长度为 10 位且以有效数字开头。
  • 使用正则表达式来验证电子邮件格式。

响应示例:

{
    "status": "PARTIAL_SUCCEED",
   
"failed_row": [
        {
           
"Name": "Rahul Gupta",
           
"Mobile": "1234567890",
           
"Email": "rahul.gupta@example.com",
           
"ErrorReason": "Invalid mobile",
           
"Succeed": false
        },
        {
           
"Name": "Mohit Sharma",
           
"Mobile": "9876543210",
           
"Email": "priya.sharma@invalid",
           
"ErrorReason": "Invalid email",
           
"Succeed": false
        }
    ]
}

我在上面的代码中使用了工作池或任务池模式。

工作池并发模式解释
工作线程池模式涉及一组工作线程 goroutine,它们同时执行来自队列或通道的任务。它旨在管理固定数量的工作线程(goroutine),以高效处理可能大量的任务(作业)。

工作池模式的优点:

  • 资源管理:限制并发 goroutine 的数量,防止资源耗尽。
  • 效率:确保任务同时处理,最大限度地提高 CPU 利用率。
  • 可扩展性:maxWorkers根据系统容量,通过改变工人数量()可轻松调整。

概括
此 Go 程序演示了如何使用并发来高效处理任务。通过使用 goroutine 和通道,程序可以并发读取、验证和处理来自 CSV 文件的联系信息,确保快速高效地完成任务。使用sync.WaitGroup有助于在生成最终响应之前同步所有任务的完成,最终响应可以指示所有任务是成功、失败还是部分成功。

结论
通过利用 Go 的并发功能,我们高效地将联系人从 CSV 文件迁移到数据库,同时处理多个联系人,大大加快了流程。此方法可轻松适应处理其他类型的批量数据迁移或处理任务。

https://github.com/aditya14as/Concurrency-in-Go/blob/master/main.go