用Go语言并发处理CSV文件到数据库

admin 2026-06-26 07:02:05 网络安全文章 来源:ZONE.CI 全球网 0 阅读模式

文章总结: 文档针对Go语言处理大CSV文件导入数据库的性能问题,提出分阶段并发处理方案。核心方案将流程拆解为读取CSV、数据清洗、批量入库三阶段,通过固定数量worker、有缓冲channel和批量插入控制资源消耗。关键实践包括设置连接池参数(最大12连接)、每批500行入库、使用ONCONFLICT更新重复数据,并强调需监控读取/入库速率比来定位瓶颈。 综合评分: 85 文章分类: 安全开发,应用安全,数据安全,技术标准,其他


cover_image

用 Go 语言 并发处理 CSV 文件到数据库

原创

go go

Go语言教程

2026年6月23日 13:21 陕西

在小说阅读器读本章

去阅读

导入一个 80M 的 CSV,接口 3 分钟没返回,数据库 CPU 没满,Go 进程内存倒是一路往上爬。

日志里最扎眼的是这一行:

import file=order_202406.csv read=1830000 insert=1821000 cost=179s err=context deadline exceeded

这类问题我一般不先怀疑数据库。数据库没满,慢不一定是 SQL 慢,很可能是程序把 CSV 一口气读进内存,或者 goroutine 开得太豪放,把连接池打穿了。

CSV 导数据库,很多人第一版会这么写:

rows, _ := reader.ReadAll()
for _, row := range rows {
    db.Exec("insert into xxx ...", row[0], row[1])
}

这个写法小文件没事,一到线上文件就开始现原形。

ReadAll() 会把整份文件吃进内存;一行一条 insert,数据库也受不了;再套一层 goroutine,每行开一个,基本就是把事故写进代码里。

我更愿意把它拆成三段:读 CSV、清洗字段、批量入库。中间用 channel 串起来,worker 数量固定,别让它失控。

表大概这样:

create table order_import (
    order_no    varchar(64) primary key,
    user_id     bigint not null,
    amount      decimal(12,2) not null,
    paid_at     timestamp null,
    source_file varchar(128) not null
);

Go 代码只贴关键段,不搞一坨完整工程。

type OrderRow struct {
    OrderNo string
    UserID  int64
    Amount  string
    PaidAt  sql.NullTime
    File    string
}

func&nbsp;readCSV(ctx context.Context, fileName&nbsp;string, out&nbsp;chan<- OrderRow)&nbsp;error&nbsp;{
&nbsp; &nbsp; f, err := os.Open(fileName)
&nbsp; &nbsp;&nbsp;if&nbsp;err !=&nbsp;nil&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;err
&nbsp; &nbsp; }
&nbsp; &nbsp;&nbsp;defer&nbsp;f.Close()

&nbsp; &nbsp; r := csv.NewReader(bufio.NewReaderSize(f,&nbsp;1024*1024))
&nbsp; &nbsp; r.FieldsPerRecord =&nbsp;-1

&nbsp; &nbsp;&nbsp;// 跳过表头
&nbsp; &nbsp;&nbsp;if&nbsp;_, err := r.Read(); err !=&nbsp;nil&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;err
&nbsp; &nbsp; }

&nbsp; &nbsp; line :=&nbsp;1
&nbsp; &nbsp;&nbsp;for&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp; rec, err := r.Read()
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;err == io.EOF {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;nil
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;err !=&nbsp;nil&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;fmt.Errorf("read csv line=%d: %w", line, err)
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; line++

&nbsp; &nbsp; &nbsp; &nbsp; row, ok := parseOrder(rec, filepath.Base(fileName))
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;!ok {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Printf("skip bad line=%d data=%v", line, rec)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;continue
&nbsp; &nbsp; &nbsp; &nbsp; }

&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;select&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;case&nbsp;out <- row:
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;case&nbsp;<-ctx.Done():
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;ctx.Err()
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; }
}

这里有两个小细节。

第一个是 bufio.NewReaderSize,别小看这个。默认 buffer 对大文件不是不能用,只是吞吐会难看。

第二个是坏数据不要直接把任务打死。CSV 这种东西,只要来自外部系统,就一定会有脏字段。金额空字符串、时间格式多个版本、用户 ID 混进中文,这都不稀奇。

清洗字段单独拎出来:

func&nbsp;parseOrder(rec []string, file&nbsp;string)&nbsp;(OrderRow,&nbsp;bool)&nbsp;{
&nbsp; &nbsp;&nbsp;if&nbsp;len(rec) <&nbsp;4&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;OrderRow{},&nbsp;false
&nbsp; &nbsp; }

&nbsp; &nbsp; uid, err := strconv.ParseInt(strings.TrimSpace(rec[1]),&nbsp;10,&nbsp;64)
&nbsp; &nbsp;&nbsp;if&nbsp;err !=&nbsp;nil&nbsp;|| uid <=&nbsp;0&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;OrderRow{},&nbsp;false
&nbsp; &nbsp; }

&nbsp; &nbsp; amount := strings.TrimSpace(rec[2])
&nbsp; &nbsp;&nbsp;if&nbsp;amount ==&nbsp;""&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;OrderRow{},&nbsp;false
&nbsp; &nbsp; }

&nbsp; &nbsp;&nbsp;var&nbsp;paid sql.NullTime
&nbsp; &nbsp; t := strings.TrimSpace(rec[3])
&nbsp; &nbsp;&nbsp;if&nbsp;t !=&nbsp;""&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp; tt, err := time.Parse("2006-01-02 15:04:05", t)
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;err !=&nbsp;nil&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;OrderRow{},&nbsp;false
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; paid = sql.NullTime{Time: tt, Valid:&nbsp;true}
&nbsp; &nbsp; }

&nbsp; &nbsp;&nbsp;return&nbsp;OrderRow{
&nbsp; &nbsp; &nbsp; &nbsp; OrderNo: strings.TrimSpace(rec[0]),
&nbsp; &nbsp; &nbsp; &nbsp; UserID: &nbsp;uid,
&nbsp; &nbsp; &nbsp; &nbsp; Amount: &nbsp;amount,
&nbsp; &nbsp; &nbsp; &nbsp; PaidAt: &nbsp;paid,
&nbsp; &nbsp; &nbsp; &nbsp; File: &nbsp; &nbsp;file,
&nbsp; &nbsp; },&nbsp;true
}

再看入库。

这里最容易犯的错是:worker 开 50 个,数据库连接池最大 10 个。结果一堆 goroutine 堵在拿连接上,看起来并发很高,其实都在排队。

我一般先把连接池写死,再让 worker 数量小于等于连接池。

func&nbsp;openDB(dsn&nbsp;string)&nbsp;(*sql.DB, error)&nbsp;{
&nbsp; &nbsp; db, err := sql.Open("postgres", dsn)
&nbsp; &nbsp;&nbsp;if&nbsp;err !=&nbsp;nil&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;nil, err
&nbsp; &nbsp; }

&nbsp; &nbsp; db.SetMaxOpenConns(12)
&nbsp; &nbsp; db.SetMaxIdleConns(6)
&nbsp; &nbsp; db.SetConnMaxLifetime(30&nbsp;* time.Minute)

&nbsp; &nbsp;&nbsp;return&nbsp;db, db.Ping()
}

批量插入不要攒太大。几百到一千行一批就够了,太大了 SQL 长、事务久,锁和 WAL 都不好看。

func&nbsp;insertBatch(ctx context.Context, db *sql.DB, rows []OrderRow)&nbsp;error&nbsp;{
&nbsp; &nbsp;&nbsp;if&nbsp;len(rows) ==&nbsp;0&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;nil
&nbsp; &nbsp; }

&nbsp; &nbsp; tx, err := db.BeginTx(ctx,&nbsp;nil)
&nbsp; &nbsp;&nbsp;if&nbsp;err !=&nbsp;nil&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;err
&nbsp; &nbsp; }
&nbsp; &nbsp;&nbsp;defer&nbsp;tx.Rollback()

&nbsp; &nbsp; stmt, err := tx.PrepareContext(ctx,&nbsp;`
&nbsp; &nbsp; &nbsp; &nbsp; insert into order_import(order_no, user_id, amount, paid_at, source_file)
&nbsp; &nbsp; &nbsp; &nbsp; values($1,$2,$3,$4,$5)
&nbsp; &nbsp; &nbsp; &nbsp; on conflict(order_no) do update set
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; user_id = excluded.user_id,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; amount = excluded.amount,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; paid_at = excluded.paid_at,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; source_file = excluded.source_file
&nbsp; &nbsp; `)
&nbsp; &nbsp;&nbsp;if&nbsp;err !=&nbsp;nil&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;err
&nbsp; &nbsp; }
&nbsp; &nbsp;&nbsp;defer&nbsp;stmt.Close()

&nbsp; &nbsp;&nbsp;for&nbsp;_, r :=&nbsp;range&nbsp;rows {
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;_, err := stmt.ExecContext(ctx, r.OrderNo, r.UserID, r.Amount, r.PaidAt, r.File); err !=&nbsp;nil&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;fmt.Errorf("insert order_no=%s: %w", r.OrderNo, err)
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; }

&nbsp; &nbsp;&nbsp;return&nbsp;tx.Commit()
}

worker 这段才是控制节奏的地方:

func&nbsp;importCSV(ctx context.Context, db *sql.DB, file&nbsp;string)&nbsp;error&nbsp;{
&nbsp; &nbsp; ctx, cancel := context.WithCancel(ctx)
&nbsp; &nbsp;&nbsp;defer&nbsp;cancel()

&nbsp; &nbsp; rowCh :=&nbsp;make(chan&nbsp;OrderRow,&nbsp;2000)
&nbsp; &nbsp; errCh :=&nbsp;make(chan&nbsp;error,&nbsp;1)

&nbsp; &nbsp;&nbsp;go&nbsp;func()&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;defer&nbsp;close(rowCh)
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;err := readCSV(ctx, file, rowCh); err !=&nbsp;nil&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; errCh <- err
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; }()

&nbsp; &nbsp;&nbsp;const&nbsp;workerN =&nbsp;8
&nbsp; &nbsp;&nbsp;const&nbsp;batchSize =&nbsp;500

&nbsp; &nbsp;&nbsp;var&nbsp;wg sync.WaitGroup
&nbsp; &nbsp;&nbsp;for&nbsp;i :=&nbsp;0; i < workerN; i++ {
&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;go&nbsp;func(workerID&nbsp;int)&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;defer&nbsp;wg.Done()

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; buf :=&nbsp;make([]OrderRow,&nbsp;0, batchSize)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; flush :=&nbsp;func()&nbsp;bool&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;err := insertBatch(ctx, db, buf); err !=&nbsp;nil&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;select&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;case&nbsp;errCh <- fmt.Errorf("worker=%d: %w", workerID, err):
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;default:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cancel()
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;false
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; buf = buf[:0]
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;true
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;r :=&nbsp;range&nbsp;rowCh {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; buf =&nbsp;append(buf, r)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;len(buf) >= batchSize {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;!flush() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;len(buf) >&nbsp;0&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; flush()
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }(i)
&nbsp; &nbsp; }

&nbsp; &nbsp; done :=&nbsp;make(chan&nbsp;struct{})
&nbsp; &nbsp;&nbsp;go&nbsp;func()&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp; wg.Wait()
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;close(done)
&nbsp; &nbsp; }()

&nbsp; &nbsp;&nbsp;select&nbsp;{
&nbsp; &nbsp;&nbsp;case&nbsp;<-done:
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;select&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;case&nbsp;err := <-errCh:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;err
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;default:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;nil
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp;&nbsp;case&nbsp;err := <-errCh:
&nbsp; &nbsp; &nbsp; &nbsp; cancel()
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;err
&nbsp; &nbsp;&nbsp;case&nbsp;<-ctx.Done():
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;ctx.Err()
&nbsp; &nbsp; }
}

这段代码看着不花哨,但线上导入我更信这种。

读文件只有一个 goroutine,避免磁盘乱抢;入库 worker 固定,避免数据库连接池被打爆;channel 有缓冲,但不是无限缓冲,后面慢了前面自然会停下来。这就是背压。

跑之前我还会加几行日志,不然出了问题只能猜:

log.Printf("import start file=%s worker=%d batch=%d", file, workerN, batchSize)

如果日志里看到读取很快、入库慢,那就看 SQL、索引、锁等待。

如果入库很快、整体还是慢,那再看 CSV 解析、磁盘 IO、网络盘。

别一上来就调 goroutine 数量。并发不是越多越快,CSV 导数据库这种活,最后卡住的通常不是 Go,而是数据库连接、事务提交、磁盘刷写。Go 能做的是把节奏控住,别自己先把自己跑死。


免责声明:

本文所载程序、技术方法仅面向合法合规的安全研究与教学场景,旨在提升网络安全防护能力,具有明确的技术研究属性。

任何单位或个人未经授权,将本文内容用于攻击、破坏等非法用途的,由此引发的全部法律责任、民事赔偿及连带责任,均由行为人独立承担,本站不承担任何连带责任。

本站内容均为技术交流与知识分享目的发布,若存在版权侵权或其他异议,请通过邮件联系处理,具体联系方式可点击页面上方的联系我

本文转载自:Go语言教程 go go《用 Go 语言 并发处理 CSV 文件到数据库》

评论:0   参与:  0