文章总结: 文档针对Go语言处理大CSV文件导入数据库的性能问题,提出分阶段并发处理方案。核心方案将流程拆解为读取CSV、数据清洗、批量入库三阶段,通过固定数量worker、有缓冲channel和批量插入控制资源消耗。关键实践包括设置连接池参数(最大12连接)、每批500行入库、使用ONCONFLICT更新重复数据,并强调需监控读取/入库速率比来定位瓶颈。 综合评分: 85 文章分类: 安全开发,应用安全,数据安全,技术标准,其他
用 Go 语言 并发处理 CSV 文件到数据库
原创
go go
Go语言教程
2026年6月23日 13:21 陕西
在小说阅读器读本章
去阅读
导入一个 80M 的 CSV,接口 3 分钟没返回,数据库 CPU 没满,Go 进程内存倒是一路往上爬。
日志里最扎眼的是这一行:
import file=order_202406.csv read=1830000 insert=1821000 cost=179s err=context deadline exceeded
这类问题我一般不先怀疑数据库。数据库没满,慢不一定是 SQL 慢,很可能是程序把 CSV 一口气读进内存,或者 goroutine 开得太豪放,把连接池打穿了。
CSV 导数据库,很多人第一版会这么写:
rows, _ := reader.ReadAll()
for _, row := range rows {
db.Exec("insert into xxx ...", row[0], row[1])
}
这个写法小文件没事,一到线上文件就开始现原形。
ReadAll() 会把整份文件吃进内存;一行一条 insert,数据库也受不了;再套一层 goroutine,每行开一个,基本就是把事故写进代码里。
我更愿意把它拆成三段:读 CSV、清洗字段、批量入库。中间用 channel 串起来,worker 数量固定,别让它失控。
表大概这样:
create table order_import (
order_no varchar(64) primary key,
user_id bigint not null,
amount decimal(12,2) not null,
paid_at timestamp null,
source_file varchar(128) not null
);
Go 代码只贴关键段,不搞一坨完整工程。
type OrderRow struct {
OrderNo string
UserID int64
Amount string
PaidAt sql.NullTime
File string
}
func readCSV(ctx context.Context, fileName string, out chan<- OrderRow) error {
f, err := os.Open(fileName)
if err != nil {
return err
}
defer f.Close()
r := csv.NewReader(bufio.NewReaderSize(f, 1024*1024))
r.FieldsPerRecord = -1
// 跳过表头
if _, err := r.Read(); err != nil {
return err
}
line := 1
for {
rec, err := r.Read()
if err == io.EOF {
return nil
}
if err != nil {
return fmt.Errorf("read csv line=%d: %w", line, err)
}
line++
row, ok := parseOrder(rec, filepath.Base(fileName))
if !ok {
log.Printf("skip bad line=%d data=%v", line, rec)
continue
}
select {
case out <- row:
case <-ctx.Done():
return ctx.Err()
}
}
}
这里有两个小细节。
第一个是 bufio.NewReaderSize,别小看这个。默认 buffer 对大文件不是不能用,只是吞吐会难看。
第二个是坏数据不要直接把任务打死。CSV 这种东西,只要来自外部系统,就一定会有脏字段。金额空字符串、时间格式多个版本、用户 ID 混进中文,这都不稀奇。
清洗字段单独拎出来:
func parseOrder(rec []string, file string) (OrderRow, bool) {
if len(rec) < 4 {
return OrderRow{}, false
}
uid, err := strconv.ParseInt(strings.TrimSpace(rec[1]), 10, 64)
if err != nil || uid <= 0 {
return OrderRow{}, false
}
amount := strings.TrimSpace(rec[2])
if amount == "" {
return OrderRow{}, false
}
var paid sql.NullTime
t := strings.TrimSpace(rec[3])
if t != "" {
tt, err := time.Parse("2006-01-02 15:04:05", t)
if err != nil {
return OrderRow{}, false
}
paid = sql.NullTime{Time: tt, Valid: true}
}
return OrderRow{
OrderNo: strings.TrimSpace(rec[0]),
UserID: uid,
Amount: amount,
PaidAt: paid,
File: file,
}, true
}
再看入库。
这里最容易犯的错是:worker 开 50 个,数据库连接池最大 10 个。结果一堆 goroutine 堵在拿连接上,看起来并发很高,其实都在排队。
我一般先把连接池写死,再让 worker 数量小于等于连接池。
func openDB(dsn string) (*sql.DB, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(12)
db.SetMaxIdleConns(6)
db.SetConnMaxLifetime(30 * time.Minute)
return db, db.Ping()
}
批量插入不要攒太大。几百到一千行一批就够了,太大了 SQL 长、事务久,锁和 WAL 都不好看。
func insertBatch(ctx context.Context, db *sql.DB, rows []OrderRow) error {
if len(rows) == 0 {
return nil
}
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, `
insert into order_import(order_no, user_id, amount, paid_at, source_file)
values($1,$2,$3,$4,$5)
on conflict(order_no) do update set
user_id = excluded.user_id,
amount = excluded.amount,
paid_at = excluded.paid_at,
source_file = excluded.source_file
`)
if err != nil {
return err
}
defer stmt.Close()
for _, r := range rows {
if _, err := stmt.ExecContext(ctx, r.OrderNo, r.UserID, r.Amount, r.PaidAt, r.File); err != nil {
return fmt.Errorf("insert order_no=%s: %w", r.OrderNo, err)
}
}
return tx.Commit()
}
worker 这段才是控制节奏的地方:
func importCSV(ctx context.Context, db *sql.DB, file string) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
rowCh := make(chan OrderRow, 2000)
errCh := make(chan error, 1)
go func() {
defer close(rowCh)
if err := readCSV(ctx, file, rowCh); err != nil {
errCh <- err
}
}()
const workerN = 8
const batchSize = 500
var wg sync.WaitGroup
for i := 0; i < workerN; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
buf := make([]OrderRow, 0, batchSize)
flush := func() bool {
if err := insertBatch(ctx, db, buf); err != nil {
select {
case errCh <- fmt.Errorf("worker=%d: %w", workerID, err):
default:
}
cancel()
return false
}
buf = buf[:0]
return true
}
for r := range rowCh {
buf = append(buf, r)
if len(buf) >= batchSize {
if !flush() {
return
}
}
}
if len(buf) > 0 {
flush()
}
}(i)
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
select {
case err := <-errCh:
return err
default:
return nil
}
case err := <-errCh:
cancel()
return err
case <-ctx.Done():
return ctx.Err()
}
}
这段代码看着不花哨,但线上导入我更信这种。
读文件只有一个 goroutine,避免磁盘乱抢;入库 worker 固定,避免数据库连接池被打爆;channel 有缓冲,但不是无限缓冲,后面慢了前面自然会停下来。这就是背压。
跑之前我还会加几行日志,不然出了问题只能猜:
log.Printf("import start file=%s worker=%d batch=%d", file, workerN, batchSize)
如果日志里看到读取很快、入库慢,那就看 SQL、索引、锁等待。
如果入库很快、整体还是慢,那再看 CSV 解析、磁盘 IO、网络盘。
别一上来就调 goroutine 数量。并发不是越多越快,CSV 导数据库这种活,最后卡住的通常不是 Go,而是数据库连接、事务提交、磁盘刷写。Go 能做的是把节奏控住,别自己先把自己跑死。
免责声明:
本文所载程序、技术方法仅面向合法合规的安全研究与教学场景,旨在提升网络安全防护能力,具有明确的技术研究属性。
任何单位或个人未经授权,将本文内容用于攻击、破坏等非法用途的,由此引发的全部法律责任、民事赔偿及连带责任,均由行为人独立承担,本站不承担任何连带责任。
本站内容均为技术交流与知识分享目的发布,若存在版权侵权或其他异议,请通过邮件联系处理,具体联系方式可点击页面上方的联系我。
本文转载自:Go语言教程 go go《用 Go 语言 并发处理 CSV 文件到数据库》
版权声明
本站仅做备份收录,仅供研究与教学参考之用。
读者将信息用于其他用途的,全部法律及连带责任由读者自行承担,本站不承担任何责任。









评论