Blame view

components/db/mongo.go 6.24 KB
ee23102d   zhangqijia   支持mongo, grpc接服务器
1
2
3
4
5
6
7
8
9
10
  package db
  
  import (
  	"context"
  	"fmt"
  	"go.mongodb.org/mongo-driver/bson"
  	"go.mongodb.org/mongo-driver/mongo"
  	"go.mongodb.org/mongo-driver/mongo/options"
  	"go.mongodb.org/mongo-driver/mongo/readpref"
  	"go.mongodb.org/mongo-driver/x/bsonx"
3592dfd3   zhangqijia   重构models, 索引唯一索引
11
  	"sort"
ee23102d   zhangqijia   支持mongo, grpc接服务器
12
13
14
  	"strconv"
  	"time"
  )
eb417b0b   zhangqijia   reactor mongo
15
  
3592dfd3   zhangqijia   重构models, 索引唯一索引
16
17
18
19
  var (
  	MongoClient *mongo.Client
  	MongoDatabase *mongo.Database
  )
ee23102d   zhangqijia   支持mongo, grpc接服务器
20
21
  
  //初始化
3592dfd3   zhangqijia   重构models, 索引唯一索引
22
  func Connect(user, password, host string,port int, MaxNum int, timeOut int, dbname string) error {
ee23102d   zhangqijia   支持mongo, grpc接服务器
23
  	var uri string
eb417b0b   zhangqijia   reactor mongo
24
25
26
  	if user!= "" {
  		//uri = fmt.Sprintf("mongodb://%s:%s@%s:%d/%s?w=majority", conf.User, conf.Password, conf.Host, conf.Port, conf.DBName)
  		uri = fmt.Sprintf("mongodb://%s:%s@%s:%d/?w=majority", user, password, host, port)
ee23102d   zhangqijia   支持mongo, grpc接服务器
27
  	}else {
eb417b0b   zhangqijia   reactor mongo
28
29
  		//uri = fmt.Sprintf("mongodb://%s:%d/%s?w=majority", conf.Host, conf.Port, conf.DBName)
  		uri = fmt.Sprintf("mongodb://%s:%d/?w=majority", host, port)
ee23102d   zhangqijia   支持mongo, grpc接服务器
30
31
  	}
  	// 设置连接超时时间
eb417b0b   zhangqijia   reactor mongo
32
  	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeOut))
ee23102d   zhangqijia   支持mongo, grpc接服务器
33
34
35
36
  	defer cancel()
  	// 通过传进来的uri连接相关的配置
  	o := options.Client().ApplyURI(uri)
  	// 设置最大连接数 - 默认是100 ,不设置就是最大 max 64
eb417b0b   zhangqijia   reactor mongo
37
  	o.SetMaxPoolSize(uint64(MaxNum))
ee23102d   zhangqijia   支持mongo, grpc接服务器
38
  	// 发起链接
eb417b0b   zhangqijia   reactor mongo
39
  	var err error
3592dfd3   zhangqijia   重构models, 索引唯一索引
40
  	MongoClient , err = mongo.Connect(ctx, o)
ee23102d   zhangqijia   支持mongo, grpc接服务器
41
  	if err != nil {
ee23102d   zhangqijia   支持mongo, grpc接服务器
42
43
44
  		return err
  	}
  	// 判断服务是不是可用
3592dfd3   zhangqijia   重构models, 索引唯一索引
45
  	if err = MongoClient.Ping(context.Background(), readpref.Primary()); err != nil {
ee23102d   zhangqijia   支持mongo, grpc接服务器
46
47
  		return err
  	}
3592dfd3   zhangqijia   重构models, 索引唯一索引
48
49
  
  	MongoDatabase = MongoClient.Database(dbname)
ee23102d   zhangqijia   支持mongo, grpc接服务器
50
51
52
  	return nil
  }
  
3592dfd3   zhangqijia   重构models, 索引唯一索引
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
  func CreateCollection(collection string) error {
  	colls, _ := MongoDatabase.ListCollectionNames(context.TODO(), bson.D{})
  	pos := sort.SearchStrings(colls, collection)
  	if pos != len(colls) {
  		if collection == colls[pos] {
  			return MongoDatabase.CreateCollection(context.TODO(), collection)
  		}
  	}
  	return MongoDatabase.CreateCollection(context.TODO(), collection)
  }
  
  func SetUnique(collection string, key string) (string, error)  {
  	return MongoDatabase.Collection(collection).Indexes().CreateOne(
  		context.TODO(),
  		mongo.IndexModel{
  			Keys   : bsonx.Doc{{key, bsonx.Int32(1)}},
  			Options: options.Index().SetUnique(true),
  		},
  	)
  }
  
eb417b0b   zhangqijia   reactor mongo
74
  type MgoColl struct {
3592dfd3   zhangqijia   重构models, 索引唯一索引
75
  	collection *mongo.Collection
ee23102d   zhangqijia   支持mongo, grpc接服务器
76
  
eb417b0b   zhangqijia   reactor mongo
77
78
  	pri interface{}
  	schema interface{}
ee23102d   zhangqijia   支持mongo, grpc接服务器
79
80
81
82
83
84
85
86
87
  }
  
  func GetBsonD(key string, value interface{}) interface{} {
  	return bson.D{ {key, value}}
  }
  func GetBsonM(key string, value interface{}) interface{}  {
  	return bson.M{key: value}
  }
  
3592dfd3   zhangqijia   重构models, 索引唯一索引
88
  func NewMongoColl(collection string, pri, schema interface{}) *MgoColl {
eb417b0b   zhangqijia   reactor mongo
89
  	return &MgoColl{
3592dfd3   zhangqijia   重构models, 索引唯一索引
90
  		collection: MongoDatabase.Collection(collection),
eb417b0b   zhangqijia   reactor mongo
91
92
93
  
  		pri: pri,
  		schema: schema,
ee23102d   zhangqijia   支持mongo, grpc接服务器
94
95
96
  	}
  }
  
eb417b0b   zhangqijia   reactor mongo
97
  func (m *MgoColl)SetDatabase(databases string)  {
ee23102d   zhangqijia   支持mongo, grpc接服务器
98
99
  	//m.db = databases
  }
ee23102d   zhangqijia   支持mongo, grpc接服务器
100
  
ee23102d   zhangqijia   支持mongo, grpc接服务器
101
  // 查询单个
eb417b0b   zhangqijia   reactor mongo
102
  func (m *MgoColl) FindOneKV(key string, value interface{}) *mongo.SingleResult {
ee23102d   zhangqijia   支持mongo, grpc接服务器
103
104
  	//collection.
  	filter := bson.D{ {key, value}}
3592dfd3   zhangqijia   重构models, 索引唯一索引
105
  	singleResult := m.collection.FindOne(context.TODO(), filter)
ee23102d   zhangqijia   支持mongo, grpc接服务器
106
107
108
  	return singleResult
  }
  
eb417b0b   zhangqijia   reactor mongo
109
  func (m *MgoColl) FindOne(pri interface{}) *mongo.SingleResult {
ee23102d   zhangqijia   支持mongo, grpc接服务器
110
  	//collection.
3592dfd3   zhangqijia   重构models, 索引唯一索引
111
  	singleResult := m.collection.FindOne(context.TODO(), pri)
ee23102d   zhangqijia   支持mongo, grpc接服务器
112
113
114
115
  	return singleResult
  }
  
  //插入单个
eb417b0b   zhangqijia   reactor mongo
116
  func (m *MgoColl) InsertOne(value interface{}) *mongo.InsertOneResult {
3592dfd3   zhangqijia   重构models, 索引唯一索引
117
  	insertResult, err := m.collection.InsertOne(context.TODO(), value)
ee23102d   zhangqijia   支持mongo, grpc接服务器
118
119
120
121
122
123
124
  	if err != nil {
  		fmt.Println(err)
  	}
  	return insertResult
  }
  
  //查询集合里有多少数据
eb417b0b   zhangqijia   reactor mongo
125
  func (m *MgoColl) CollectionCount() (string, int64) {
3592dfd3   zhangqijia   重构models, 索引唯一索引
126
127
  	size, _ := m.collection.EstimatedDocumentCount(context.TODO())
  	return m.collection.Name(), size
ee23102d   zhangqijia   支持mongo, grpc接服务器
128
129
130
  }
  
  //按选项查询集合 Skip 跳过 Limit 读取数量 sort 1 ,-1 . 1 为最初时间读取 , -1 为最新时间读取
eb417b0b   zhangqijia   reactor mongo
131
  func (m *MgoColl) CollectionDocuments(Skip, Limit int64, sort int) *mongo.Cursor {
ee23102d   zhangqijia   支持mongo, grpc接服务器
132
133
134
135
136
137
  	SORT := bson.D{
  		{"_id", sort}} //filter := bson.D{ {key,value}}
  	filter := bson.D{
  	{}}
  	findOptions := options.Find().SetSort(SORT).SetLimit(Limit).SetSkip(Skip)
  	//findOptions.SetLimit(i)
3592dfd3   zhangqijia   重构models, 索引唯一索引
138
  	temp, _ := m.collection.Find(context.Background(), filter, findOptions)
ee23102d   zhangqijia   支持mongo, grpc接服务器
139
140
141
142
  	return temp
  }
  
  //获取集合创建时间和编号
eb417b0b   zhangqijia   reactor mongo
143
  func (m *MgoColl) ParsingId(result string) (time.Time, uint64) {
ee23102d   zhangqijia   支持mongo, grpc接服务器
144
145
146
147
148
149
150
151
152
  	temp1 := result[:8]
  	timestamp, _ := strconv.ParseInt(temp1, 16, 64)
  	dateTime := time.Unix(timestamp, 0) //这是截获情报时间 时间格式 2019-04-24 09:23:39 +0800 CST
  	temp2 := result[18:]
  	count, _ := strconv.ParseUint(temp2, 16, 64) //截获情报的编号
  	return dateTime, count
  }
  
  //删除文章和查询文章
eb417b0b   zhangqijia   reactor mongo
153
  func (m *MgoColl) DeleteAndFind(key string, value interface{}) (int64, *mongo.SingleResult) {
ee23102d   zhangqijia   支持mongo, grpc接服务器
154
155
  	filter := bson.D{
  		{key, value}}
3592dfd3   zhangqijia   重构models, 索引唯一索引
156
157
  	singleResult := m.collection.FindOne(context.TODO(), filter)
  	DeleteResult, err := m.collection.DeleteOne(context.TODO(), filter, nil)
ee23102d   zhangqijia   支持mongo, grpc接服务器
158
159
160
161
162
163
164
  	if err != nil {
  		fmt.Println("删除时出现错误,你删不掉的~")
  	}
  	return DeleteResult.DeletedCount, singleResult
  }
  
  //删除文章
eb417b0b   zhangqijia   reactor mongo
165
  func (m *MgoColl) Delete(key string, value interface{}) int64 {
ee23102d   zhangqijia   支持mongo, grpc接服务器
166
  	filter := bson.D{ {key, value}}
3592dfd3   zhangqijia   重构models, 索引唯一索引
167
  	count, err := m.collection.DeleteOne(context.TODO(), filter, nil)
ee23102d   zhangqijia   支持mongo, grpc接服务器
168
169
170
171
172
173
174
175
  	if err != nil {
  		fmt.Println(err)
  	}
  	return count.DeletedCount
  
  }
  
  //删除多个
eb417b0b   zhangqijia   reactor mongo
176
  func (m *MgoColl) DeleteMany(key string, value interface{}) int64 {
ee23102d   zhangqijia   支持mongo, grpc接服务器
177
178
  	filter := bson.D{ {key, value}}
  
3592dfd3   zhangqijia   重构models, 索引唯一索引
179
  	count, err := m.collection.DeleteMany(context.TODO(), filter)
ee23102d   zhangqijia   支持mongo, grpc接服务器
180
181
182
183
184
185
186
  	if err != nil {
  		fmt.Println(err)
  	}
  	return count.DeletedCount
  }
  
  //索引
3592dfd3   zhangqijia   重构models, 索引唯一索引
187
188
  func (m *MgoColl) SetUnique(key string){
  	m.collection.Indexes().CreateOne(
ee23102d   zhangqijia   支持mongo, grpc接服务器
189
190
191
192
193
194
195
196
197
  		context.Background(),
  		mongo.IndexModel{
  			Keys   : bsonx.Doc{{key, bsonx.Int32(1)}},
  			Options: options.Index().SetUnique(true),
  		},
  	)
  }
  
  //更新&保存
eb417b0b   zhangqijia   reactor mongo
198
  func (m *MgoColl) FindOneAndUpdate(filter interface{}, update interface{})*mongo.SingleResult {
ee23102d   zhangqijia   支持mongo, grpc接服务器
199
200
201
  	//filter := bson.M{"name": "x", "array.name": "b"}
  	//update := bson.M{"array.$[item].detail": "test"}
  
3592dfd3   zhangqijia   重构models, 索引唯一索引
202
  	res := m.collection.FindOneAndUpdate(context.Background(),
ee23102d   zhangqijia   支持mongo, grpc接服务器
203
204
205
206
207
208
209
210
  		filter,
  		bson.M{"$set": update})
  	if res.Err() != nil {
  		return nil
  	}
  	return res
  }
  
eb417b0b   zhangqijia   reactor mongo
211
  func (m *MgoColl) UpdateOne(filter interface{}, update interface{})*mongo.UpdateResult {
3592dfd3   zhangqijia   重构models, 索引唯一索引
212
  	res, err := m.collection.UpdateOne(context.TODO(), filter, update)
ee23102d   zhangqijia   支持mongo, grpc接服务器
213
214
215
216
217
  	if err != nil {
  		return nil
  	}
  
  	return res
eb417b0b   zhangqijia   reactor mongo
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
  }
  
  
  func (m *MgoColl) Load() error{
  	r := m.FindOne(m.pri)
  	err := r.Decode(m.schema)
  	if err != nil {
  		return err
  	}
  	return nil
  }
  
  func (m *MgoColl) Create()  {
  	m.InsertOne(m.schema)
  }
  
  func (m *MgoColl) Update(update interface{}) {
  	m.FindOneAndUpdate(m.pri, update)
  }
  
  func (m *MgoColl)Save() {
  	m.FindOneAndUpdate(m.pri, m.schema)
ee23102d   zhangqijia   支持mongo, grpc接服务器
240
  }