-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMongoDB使用小结:一些常用操作分享
836 lines (551 loc) · 29.1 KB
/
MongoDB使用小结:一些常用操作分享
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
MongoDB使用小结:一些常用操作分享
本文整理了一年多以来我常用的MongoDB操作,涉及mongo-shell、pymongo,既有运维层面也有应用层面,内容有浅有深,这也就是我从零到熟练的历程。
MongoDB的使用之前也分享过一篇,稍微高阶点:见这里:《MongoDB使用小结》
1、shell登陆和显示
假设在本机上有一个端口为17380的MongoDB服务,假设已经把mongo bin文件加入到系统PATH下。
登陆:mongo --port 17380
显示DB:show dbs
进入某DB:use test_cswuyg
显示集合:show tables
2、简单查找
查找文档:db.test_mac_id.find({'a': 'b'})
删除文档:db.test_mac_id.remove({'a': 'b'})
查找找到某一天的数据:
db.a.find({'D' : ISODate('2014-04-21T00:00:00Z')}) 或者 db.a.find({'D' : ISODate('2014-04-21')})
删除某一天的数据:
db.region_mac_id_result.remove({"D" : ISODate('2014-04-17')})
小于2014.6.5的数据:
db.xxx.find({E: {$lt :ISODate('2014-06-05')}})
大于等于2014.6.1的数据:
db.xxx.find({E: {$gte: ISODate("2014-05-29")}}).count()
两个条件:
db.xxx.find({E:{$gte: ISODate("2014-05-29"), $lte: ISODate("2014-06-04")}}).count()
json中的嵌套对象查询,采用“点”的方式:
mongos> db.wyg.find({"a.b": {$exists: true}})
{ "_id" : "c", "a" : { "b" : 10 } }
某个字段存在,且小于1000有多少:
db.stat.find({_: ISODate("2014-06-17"), "123": {$exists: 1, $lte: 1000}}, {"123": 1}).count()
3、存在和遍历统计
存在'i': 1,且存在old_id字段:
mongos> var it = db.test.find({'i': 1, "old_id": {$exists: 1}})
遍历计数1:mongos> var count = 0;while(it.hasNext()){if (it.next()["X"].length==32)++count}print(count)
遍历计数2:mongos> var count = 0;while(it.hasNext()){var item = it.next(); if (item['X'].length==32 && item['_id'] != item['X'])++count;if(!item['X'])++count;}print(count)
4、插入和更新
> db.test.findOne({_id: 'cswuyg'})
null
> db.test.insert({'_id': 'cswuyg', 'super_admin': true})
> db.test.findOne({'_id': 'cswuyg'})
{
"_id" : "cswuyg",
"super_admin" : true
}
db.test.update({'_id': 'cswuyg'}, {$set: {'super_admin': true}})
5、repair 操作
对某个DB执行repair:进入要repair的db,执行db.repairDatabase()
对mongodb整个实例执行repair:numactl --interleave=all /mongod --repair --dbpath=/home/disk1/mongodata/shard/
6、mongodb任务操作
停止某个操作:
复制代码
[xxx]$ mongo --port 17380
MongoDB shell version: 2.4.5
connecting to: 127.0.0.1:17380/test
mongos> db.currentOp()
{ "inprog" : [ ...] }
mongos> db.killOp("shard0001:163415563")
复制代码
批量停止:
db.currentOp().inprog.forEach(function(item){db.killOp(item.opid)})
当查询超过1000秒的,停止:
db.currentOp().inprog.forEach(function(item){if(item.secs_running > 1000 )db.killOp(item.opid)})
停止某个数据源的查询:
db.currentOp().inprog.forEach(function(item){if(item.ns == "cswuyg.cswuyg")db.killOp(item.opid)})
把所有在等待锁的操作显示出来:
db.currentOp().inprog.forEach(function(item){if(item.waitingForLock)print(JSON.stringify(item))})
把处于等待中的分片显示出来:
db.currentOp().inprog.forEach(function(item){if(item.waitingForLock){print(item.opid.substr(0,9));print(item.op);}})
把非等待的分片显示出来:
db.currentOp().inprog.forEach(function(item){if(!item.waitingForLock){var lock_info = item["opid"];print(lock_info.substr(0,9));print(item["op"]);}})
查找所有的查询任务:
db.currentOp().inprog.forEach(function(item){if(item.op=="query"){print(item.opid);}})
查找所有的非查询任务:
db.currentOp().inprog.forEach(function(item){if(item.op!="query"){print(item.opid);}})
查找所有的操作:
db.currentOp().inprog.forEach(function(item){print(item.op, item.opid);});
常用js脚本,可直接复制到mongo-shell下使用:
显示当前所有的任务状态:
print("##########");db.currentOp().inprog.forEach(function(item){if(item.waitingForLock){var lock_info = item["opid"];print("waiting:",lock_info,item.op,item.ns);}});print("----");db.currentOp().inprog.forEach(function(item){if(!item.waitingForLock){var lock_info = item["opid"];print("doing",lock_info,item.op,item.ns);}});print("##########");
杀掉某些特定任务:
(1)
db.currentOp().inprog.forEach(function(item){if(item.waitingForLock){var lock_info = item["opid"];if(item.op=="query" && item.secs_running >60 && item.ns=="cswuyg.cswuyg"){db.killOp(item.opid)}}})
(2)
复制代码
db.currentOp().inprog.forEach(function(item) {
var lock_info = item["opid"];
if (item.op == "query" && item.secs_running > 1000) {
print("kill", item.opid);
db.killOp(item.opid)
}
})
复制代码
7、删除并返回数据
old_item = db.swuyg.findAndModify({query: {"_id": "aabbccdd"}, fields:{"D": 1,'E':1, 'F':1}, remove: true})
fields里面为1的是要返回的数据。
8、分布式集群部署情况
(1) 细致到collection的显示:sh.status()
(2)仅显示分片:
use config; db.shards.find()
{ "_id" : "shard0000", "host" : "xxhost:10001" }
{ "_id" : "shard0001", "host" : "yyhost:10002" }
....
(3)
use admin
db.runCommand({listshards: 1})
列出所有的shard server
9、正则表达式查找
正则表达式查询:
mongos> db.a.find({"tt": /t*/i})
{ "_id" : ObjectId("54b21e0f570cb10de814f86b"), "aa" : "1", "tt" : "tt" }
其中i表明是否是case-insensitive,有i则表示忽略大小写
db.testing.find({"name":/[7-9]/})
当name的值为789这几个数字组成的字符串时,查询命中。
10、查询性能
db.testing.find({name: 123}).explain()
输出结果:
复制代码
{
"cursor" : "BasicCursor",
"isMultiKey" : false,
"n" : 1,
"nscannedObjects" : 10,
"nscanned" : 10,
"nscannedObjectsAllPlans" : 10,
"nscannedAllPlans" : 10,
"scanAndOrder" : false,
"indexOnly" : false,
"nYields" : 0,
"nChunkSkips" : 0,
"millis" : 0,
"indexBounds" : {
},
"server" : "xxx:10001"
}
复制代码
11、更新或插入
当该key不存在的时候执行插入操作,当存在的时候则不管,可以使用setOnInsert
db.wyg.update({'_id': 'id'}, {'$setOnInsert': {'a': 'a'}, '$set': {'b': 'b'}}, true)
当id存在的时候,忽略setOnInsert。
当id存在的时候,如果要插入,则插入{'a': 'a'}
最后的参数true,则是指明,当update不存在的_id时,执行插入操作。默认是false,只更新,不插入。
push、setOnInsert:db.cswuyg.update({"_id": "abc"}, {$push: {"name": "c"}, $setOnInsert: {"cc":"xx"}}, true)
12、计算DB中collection的数量
db.system.namespaces.count()
13、增加数字,采用$inc
db.cswuyg.update({"a.b": {$exists: true}}, {$inc: {'a.b': 2}})
也就是对象a.b的值,增加了2
注意$inc只能用于数值。
14、删除某个key
db.cswuyg.update({'_id': 'c'}, {$unset: {'b': {$exists: true}}})
{ "_id" : "c", "a" : { "b" : 12 }, "b" : 7 }
转变为:
{ "_id" : "c", "a" : { "b" : 12 } }
15、增加key:value
db.cswuyg.update({'_id': 'z'}, {'$set': {'hello': 'z'}})
{ "_id" : "z", "b" : 1 }
转变为:
{ "_id" : "z", "b" : 1, "hello" : "z" }
16、删除数据库、删除表
删除数据库:db.dropDatabase();
删除表:db.mytable.drop();
17、查找到数据只看某列
只显示key名为D的数据:db.test.find({}, {D: 1})
18、查看分片存储情况
(1)所有DB的分片存储信息,包括chunks数、shard key信息:db.printShardingStatus()
(2)db.collection.getShardDistribution() 获取collection各个分片的数据存储情况
(3)sh.status() 显示本mongos集群所有DB的信息, 包含了Shard Key信息
19、查看collection的索引
db.cswuyg.getIndexes()
20、开启某collection的分片功能
1. ./bin/mongo –port 20000
2. mongos> use admin
3. switched to db admin
4. mongos> db.runCommand({'enablesharding"' 'test'})
5. { "ok" : 1 }
开启user collection分片功能:
1. mongos> db.runCommand({'shardcollection': 'test.user', 'key': {'_id': 1}})
{ "collectionsharded" : "test.user", "ok" : 1 }
21、判断当前是否是shard集群
isdbgrid:用来确认当前是否是 Sharding Cluster
复制代码
> db.runCommand({isdbgrid:1});
是:
{ "isdbgrid" : 1, "hostname" : "xxxhost", "ok" : 1 }
不是:
{
"ok" : 0,
"errmsg" : "no such cmd: isdbgrid",
"code" : 59,
"bad cmd" : {
"isdbgrid" : 1
}
}
复制代码
22、$addToSet、$each插入数组数据
mongos> db.cswuyg.find()
{ "_id" : "wyg", "a" : "c", "add" : [ "a", "b" ] }
mongos> db.cswuyg.update({"_id": "wyg"}, {"$set": {"a": "c"}, "$addToSet": {"add": {"$each" :["a", "c"]}}}, true)
mongos> db.cswuyg.find()
{ "_id" : "wyg", "a" : "c", "add" : [ "a", "b", "c" ] }
$each是为了实现list中的每个元素都插入,如果没有$each,则会把整个list作为一个元素插入,变成了2维数组。
$addToSet会判断集合是否需要排重,保证集合不重。$push可以对数组添加元素,但它只是直接插入数据,不做排重。
eg:db.test.update({"a": 1}, {$push: {"name": {$each:["a", "c"]}}})
eg:db.test.update({"a": 1}, {$addToSet: {"name": {$each: ["a", "d"]}}})
不去重插入 pushAll
> db.push.insert({"a": "b", "c": ["c", "d"]})
> db.push.find()
{ "_id" : ObjectId("53e4be775fdf37629312b96c"), "a" : "b", "c" : [ "c", "d" ]
}
> db.push.update({"a":"b"}, {$pushAll:{"c": ["z", "d"]}})
> db.push.find()
{ "_id" : ObjectId("53e4be775fdf37629312b96c"), "a" : "b", "c" : [ "c", "d",
"z", "d" ] }
pushAll跟push类似,不同的是pushAll可以一次插入多个value,而不需要使用$each。
23、刷新配置信息
db.runCommand("flushRouterConfig");
24、批量更新
db.xxx.update({"_id": {$exists: 1}}, {$set: {"_" : ISODate("2014-03-21T00: 00:00Z")}}, true, true)
最后一个参数表示是否要批量更新,如果不指定,则一次只更新一个document。
25、dump DB
mongodump支持从DB磁盘文件、运行中的MongoD服务中dump出bson数据文件。
(1)关闭MongoD之后,从它的DB磁盘文件中dump出数据(注:仅限单实例mongod):
mongodump --dbpath=/home/disk1/mongodata/shard/ -d cswuyg -o /home/disk2/mongodata/shard
参考:
http://docs.mongodb.org/manual/reference/program/mongodump/
http://stackoverflow.com/questions/5191186/how-do-i-dump-data-for-a-given-date
(2)从运行的MongoD中导出指定日期数据,采用-q查询参数:
mongodump -h xxxhost --port 17380 --db cswuyg --collection test -q "{D: {\$gte: {\$date: `date -d "20140410" +%s`000}, \$lt: {\$date: `date +%s`000}}}"
mongodump -dbpath=/home/disk3/mongodb/data/shard1/ -d cswuyg -c test -o /home/disk9/mongodata/shard1_2/ -q "{_:{\$gte:{\$date:`date -d "20140916" +%s`000}, \$lt: {\$date: `date -d "20140918" +%s`000}}}"
dump出来的bson文件去掉了索引、碎片空间,所有相比DB磁盘文件要小很多。
26、restore DB
restore的时候,不能先建索引,必须是restore完数据之后再建索引,否则restore的时候会非常慢。而且一般不需要自己手动建索引,在数据bson文件的同目录下有一个索引bson文件(system.indexes.bson),restore完数据之后会mongorestore自动根据该文件创建索引。
(1)从某个文件restore
mongorestore --host xxxhost --port 17380 --db cswuyg --collection cswuyg ./cswuyg_1406330331/cswuyg/cswuyg_bak.bson
(2)从目录restore
./mongorestore --port 27018 /home/disk2/mongodata/shard/
(3)dump 和 restore configure server:
mongodump --host xxxhost --port 19913 --db config -o /home/work/cswuyg/test/config
mongorestore --host xxxhost --port 19914 /home/work/cswuyg/test/config
27、创建索引
复制代码
看看当前的test collection上有啥索引:
mongos> db.test.getIndexes()
[
{
"v" : 1,
"key" : {
"_id" : 1
},
"ns" : "cswuyg.test",
"name" : "_id_"
}
]
当前只有_id这个默认索引,我要在test collection上为 index 字段创建索引:
mongos> db.test.ensureIndex({"index": 1})
创建完了之后,再看看test collection上的索引有哪些:
mongos> db.test.getIndexes()
[
{
"v" : 1,
"key" : {
"_id" : 1
},
"ns" : "cswuyg.test",
"name" : "_id_"
},
{
"v" : 1,
"key" : {
"index" : 1
},
"ns" : "cswuyg.test",
"name" : "index_1"
}
]
复制代码
创建索引,并指定过期时间:db.a.ensureIndex({'_':-1}, {expireAfterSeconds: 1000}) 1000Second.
修改过期时间: db.runCommand({"collMod": "a", index: {keyPattern:{"_": -1}, expireAfterSeconds: 60}})
28、删除索引
db.post.dropIndexes() 删除post上所有索引
db.post.dropIndex({name: 1}) 删除指定的单个索引
29、唯一索引问题
如果集群在_id上进行了分片,则无法再在其他字段上建立唯一索引:
复制代码
mongos> db.a.ensureIndex({'b': 1}, {'unique': true})
{
"raw" : {
"set_a/xxxhost:20001,yyyhost:20002" : {
"createdCollectionAutomatically" : false,
"numIndexesBefore" : 1,
"ok" : 0,
"errmsg" : "cannot create unique index over { b: 1.0 } with shard key pattern { _id: 1.0 }",
"code" : 67
},
"set_b/xxxhost:30001,yyyhost:30002" : {
"createdCollectionAutomatically" : false,
"numIndexesBefore" : 1,
"ok" : 0,
"errmsg" : "cannot create unique index over { b: 1.0 } with shard key pattern { _id: 1.0 }",
"code" : 67
}
},
"code" : 67,
"ok" : 0,
"errmsg" : "{ set_a/xxxhost:20001,yyyhos:20002: \"cannot create unique index over { b: 1.0 } with shard key pattern { _id: 1.0 }\", set_b/xxxhost:30001,yyyhost:30002: \"cannot create unique index over { b: 1.0 } with shard key pattern { _id: 1.0 }\" }"
}
复制代码
之所以出现这个错误是因为MongoDB无法保证集群中除了片键以外其他字段的唯一性,能保证片键的唯一性是因为文档根据片键进行切分,一个特定的文档只属于一个分片,MongoDB只要保证它在那个分片上唯一就在整个集群中唯一,实现分片集群上的文档唯一性一种方法是在创建片键的时候指定它的唯一性。
30、因迁移导致出现count得到的数字是真实数字的两倍
cswuyg> db.test.find({D: ISODate('2015-08-31'), B: 'active'}).count()
52118
cswuyg> db.test.find({D: ISODate('2015-08-31'), B: 'active'}).forEach(function(item){db.test_count.insert(item)})
cswuyg> db.test_count.count()
26445
解决方法:http://docs.mongodb.org/manual/reference/command/count/#behavior
“On a sharded cluster, count can result in an inaccurate count if orphaned documents exist or if a chunk migration is in progress.
To avoid these situations, on a sharded cluster, use the $group stage of the db.collection.aggregate()method to $sum the documents. ”
31、自定义MongoDB操作函数
可以把自己写的js代码保存在某个地方,让MongoDB加载它,然后就可以在MongoDB的命令行里操作它们。
mongodb shell默认会加载~/.mongorc.js文件
例如以下修改了启动提示文字、左侧提示文字,增加了my_show_shards shell函数用于显示当前sharded collection的chunks在各分片的负载情况:
复制代码
//~/.mongorc.js
//show at begin
var compliment = ["attractive", "intelligent", "like batman"];
var index = Math.floor(Math.random()*3);
print("Hello, you're looking particularly " + compliment[index] + " today!");
//change the prompt
prompt = function(){
if (typeof db == "undefined") {
return "(nodb)> ";
}
// Check the last db operation
try {
db.runCommand({getLastError: 1});
}
catch (e) {
print(e);
}
return db + "> ";
}
//show all shard's chunks
function my_show_shards() {
var config_db = db.getSiblingDB("config");
var collections = {};
var shards = {};
var shard_it = config_db.chunks.find().snapshot();
while (shard_it.hasNext()) {
next_item = shard_it.next();
collections[JSON.stringify(next_item["ns"]).replace(/\"/g, "")] = 1;
shards[JSON.stringify(next_item["shard"]).replace(/\"/g, "")] = 1;
}
var list_collections = [];
var list_shards = [];
for (item in collections) {
list_collections.push(item);
}
for (item in shards) {
list_shards.push(item);
}
list_collections.forEach(function(collec) {
list_shards.forEach(function(item) {
obj = {};
obj["shard"] = item;
obj["ns"] = collec;
it = config_db.chunks.find(obj);
print(collec, item, it.count());
})
})
}
复制代码
32、关闭mongod
mongo admin --port 17380 --eval "db.shutdownServer()"
33、查看chunks信息
eg: 进入到config db下,执行
db.chunks.find()
34、预分片参考
http://blog.zawodny.com/2011/03/06/mongodb-pre-splitting-for-faster-data-loading-and-importing/
35、DB重命名
db.copyDatabase("xx_config_20141113", "xx_config")
use xx_config_20141113
db.dropDatabase();
远程拷贝DB :
db.copyDatabase(fromdb, todb, fromhost, username, password)
db.copyDatabase("t_config", "t_config_v1", "xxxhost: 17380")
这个拷贝过程很慢。
注意,sharded的DB是无法拷贝的,所以sharded的DB也无法采用上面的方式重命名。
参考: http://docs.objectrocket.com/features.html "Your remote database is sharded through mongos."
拷贝collection:
db.collection.copyTo("newcollection")
同样,sharded的collection也无法拷贝。
36、聚合运算
包括:
1、pipeline;
2、map-reduce
管道:
http://docs.mongodb.org/manual/tutorial/aggregation-with-user-preference-data/
http://docs.mongodb.org/manual/reference/operator/aggregation/#aggregation-expression-operators
http://api.mongodb.org/python/current/api/pymongo/collection.html#pymongo.collection.Collection.aggregate
2.6之前的MongoDB,管道不支持超过16MB的返回集合。
可以使用$out操作符,把结果写入到collection中。如果aggregation成功,$out会替换已有的colleciton,但不会修改索引信息,如果失败,则什么都不做。
http://docs.mongodb.org/manual/reference/operator/aggregation/out/#pipe._S_out
37、aggregate pipeline demo
demo1,基础:
cswuyg_test> db.cswuyg_test.aggregate({"$match": {"_": ISODate("2015-02-16"), "$or": [{"13098765": {"$exists": true}}, {"13123456": {"$exists": true}}]}}, {"$group": {"_id": "$_", "13098765": {"$sum": "$13098765"}, "13123456":{"$sum": "$13123456"}}})
demo2,使用磁盘:
db.test.aggregate([{'$match': {'D': {'$nin': ['a', 'b', 'c']}, '_': {'$lte': ISODate("2015-02-27"), '$gte': ISODate("2015-02-26")}}}, {'$group': {'_id': {'a': '$a', 'b': '$b', 'D': '$D'}, 'value': {'$sum': 1}}}], {'allowDiskUse': true})
demo3,指定输出文档:
db.cswuyg.aggregate([ {'$match': {'D': {'$nin': ['a', 'b', 'c']}, '_': {'$lte': ISODate("2015-02-10"), '$gte': ISODate("2015-02-09")}}}, {'$group': {'_id': {'C': '$C', 'D': '$D'}, 'value': {'$sum': 1}}}, {"$out": "cswuyg_out"}], {'allowDiskUse':true})
db.b.aggregate([{$match: {a: {$size: 6}}}, {$group: {_id: {a: '$a'}}}, {$out: 'hh_col'}], {allowDiskUse: true})
注:指定输出文档,只能输出到本DB下。
aggregate练习:
pymongo代码:
[{'$match': {u'I': {u'$in': [XXX']}, u'H': u'id', u'12345678': {u'$exists': True}, '_':{'$lte': datetime.datetime(2015, 6, 1, 23, 59, 59), '$gte': datetime.datetime(2015, 6, 1, 0, 0)}}}, {'$group': {'_id': {u'12345678': u'$12345678', u'G': u'$G'}, 'value': {'$sum': 1}}}]
shell下代码:
db.test.aggregate([{$match: {_:ISODate("2015-06-01"), "H": "id", "12345678": {"$exists": true}, "I": "XXX"}}, {$group: {_id: {"12345678": "$12345678", "G": "$G"}, "value": {"$sum": 1}}}], {allowDiskUse:true})
38、修改Key:Value中的Value
给字段B的值加上大括号'{':
db.test.find({_:ISODate("2014-11-02")}).forEach(function(item){if(/{.+}/.test(item["B"])){}else{print(item["B"]);db.test.update({"_id": item["_id"]}, {"$set": {"B": "{" + item["B"] + "}"}})}})
39、修改primary shard
db.runCommand({"movePrimary": "test", "to": "shard0000"})
这样子test DB 里的非sharded cocllection数据就会被存放到shard0000中,非空DB的话,可能会有一个migrate的过程。
40、 mongodb默认开启autobalancer
balancer是sharded集群的负载均衡工具,新建集群的时候默认开启,除非你在config里把它关闭掉:
config> db.settings.find()
{ "_id" : "chunksize", "value" : 64 }
{ "_id" : "balancer", "activeWindow" : { "start" : "14:00", "stop" : "19:30" }, "stopped" : false}
activeWindow指定autobalancer执行均衡的时间窗口。
stopped说明是否使用autobalancer。
手动启动balancer:sh.startBalancer()
判断当前balancer是否在跑:sh.isBalancerRunning()
41、MongoDB插入性能优化
插入性能:200W的数据,在之前没有排序就直接插入,耗时4小时多,现在,做了排序,插入只需要5分钟。排序对于单机版本的MongoDB性能更佳,避免了随机插入引发的频繁随机IO。
排序:在做分文件排序的时候,文件分得越小,排序越快,当然也不能小到1,否则频繁打开文件也耗费时间。
42、MongoDB数组操作
1、更新/插入数据,不考虑重复值:
mongos> db.test.update({"helo":"he2"}, {"$push": {"name":"b"}})
多次插入后结果:
{ "_id" : ObjectId("54a7aa2be53662aebc28585f"), "helo" : "he2", "name" : [ "a", "b", "b" ] }
2、更新/插入数据,保证不重复:
mongos> db.test.update({"helo":"she"}, {"$addToSet": {"name":"b"}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 0 })
多次插入后结果:
{ "_id" : ObjectId("54dd6e1b570cb10de814f86d"), "helo" : "she", "name" : [ "b" ] }
保证只有一个值。
3、数组元素个数:
$size 用来指定数组的元素个数,显示fruit数组长度为3的document:
mongos> db.a.find({"fruit": {$size: 3}})
{ "_id" : ObjectId("54b334798220cd3ad74db314"), "fruit" : [ "apple", "orange", "cherry" ] }
43、更换Key: Value中的Key
verRelease 换为 TEST
db.test.find().forEach(function(item){db.test.update({_id:item["_id"]}, {"$set": {"TEST": item["verRelease"]}}, {"$unset":{"verRelease":{"exists":true}}})})
或者更新部分修改为:
db.test.update({D : ISODate("2014-11-02")}, {$rename : {"AcT": "AM"}}, false, true)
44、手动在shell下moveChunk
config> sh.moveChunk("xx.yy", { "_id" : { "D" : ISODate("2015-02-24T00:00:00Z"), "id" : "3f" } }, "shard0003")
如果出现错误,参考这里:可能需要重启
http://stackoverflow.com/questions/26640861/movechunk-failed-to-engage-to-shard-in-the-data-transfer-cant-accept-new-chunk
45、MongoDB升级后的兼容问题
MongoDB 2.4切换到2.6之后,出现数据没有插入,pymongo代码:
update_obj = {"$set" : {"a": 1}}
update_obj["$inc"] = inc_objs
update_obj["$push"] = list_objs
db.test.update({"_id": id}, update_obj, True)
2.6里,inc如果是空的,则会导致整条日志没有插入,2.4则inc为空也可以正常运行。
46、格式化json显示
db.collection.find().pretty()
47、更新replica集群的域名信息
cfg = rs.conf()
cfg.members[0].host = "xxxhost: 20000"
cfg.members[1].host = "yyyhost: 20001"
cfg.members[2].host = "zzzhost: 20002"
rs.reconfig(cfg)
48、不要直接修改local.system.replset
不要直接修改local.system.replset,因为他只能修改本机器的本地信息。
但是如果出现了:
{
"errmsg" : "exception: can't use localhost in repl set member names except when using it for all members",
"code" : 13393,
"ok" : 0
}
这样的错误,那就要修改本机的local.system.replset,然后重启。
49、排重统计
(1)aggregate
result = db.flu_test.aggregate([{$match: {_: ISODate("2015-05-01")}}, {$group:{_id:"$F", value: {$sum:1}}}], {allowDiskUse:true})
result.itcount()
(2)distinct
flu_test> db.flu_test.distinct('F', {_:ISODate("2015-06-22")})
但是,由于distinct将结果保存在list中,所以很容易触发文档超过16MB的错误:
2015-06-23T15:31:34.479+0800 distinct failed: {
"errmsg" : "exception: distinct too big, 16mb cap",
"code" : 17217,
"ok" : 0
} at src/mongo/shell/collection.js:1108
非排重文档量统计:
mongos> count = db.flu_test.aggregate([{$match:{_:ISODate("2015-05-21")}}, {$group:{_id:null, value: {$sum:1}}}], {allowDiskUse:true})
{ "_id" : null, "value" : 3338987 }
50、pymongo优先读取副本集Secondary节点
优先读取副本集Secondary节点,可以减少primary节点负担,在primary节点跟secondary节点同步延迟较短、业务对数据不要求实时一致时可以利用副本集做读写分离和负载均衡。
副本集集群的读取有这几种使用方式:
primary: 默认参数,只从主节点读取;
primaryPreferred: 大部分从主节点上读取,主节点不可用时从Secondary节点读取;
secondary: 只从Secondary节点上进行读取操作;
secondaryPreferred: 优先从Secondary节点读取,Secondary节点不可用时从主节点读取;
nearest: 从网络延迟最低的节点上读取。
(1)测试1 优先从secondary读取数据:
复制代码
import pymongo
client = pymongo.MongoReplicaSetClient('xxxhost: yyyport', replicaSet='my_set', readPreference='secondaryPreferred')
print client.read_preference # 显示当前的读取设定
for i in xrange(1, 10000): # 循环10000次,用mongostat观察查询负载
a = client['history']['20140409'].find_one({"ver_code": "128"})
print a
复制代码
(2)测试2 直接连接某Secondary节点读取数据:
import pymongo
client = pymongo.MongoClient('xxxhost', yyyport, slaveOk=True)
a = client['msgdc_ip']['query_ip'].find().count()
print a
参考:
http://www.lanceyan.com/category/tech/mongodb
http://emptysqua.re/blog/reading-from-mongodb-replica-sets-with-pymongo/
http://api.mongodb.org/python/current/api/pymongo/read_preferences.html#module-pymongo.read_preferences
http://api.mongodb.org/python/current/api/pymongo/mongo_client.html#pymongo.mongo_client.MongoClient
注意:3.0之后MongoReplicaSetClient函数是要被放弃的。
但是测试时发现:在较低版本中,需要使用MongoReplicaSetClient,MongoClient无法实现 pymongo.ReadPreference.SECONDARY_PREFERRED功能。
2015.12.28补充:
51、为副本集设置标签
可以为副本集中的每个成员设置tag(标签),设置标签的好处是后面读数据时,应用可以指定从某类标签的副本上读数据。
复制代码
为replica 设置tag
在副本shell下执行:
var conf = rs.conf()
conf.members[0].tags = { "location": "beijing" }
conf.members[1].tags = { "location": "hangzhou"}
conf.members[2].tags = { "location": "guangzhou" }
rs.reconfig(conf)
复制代码
参考:https://docs.mongodb.org/v3.0/tutorial/configure-replica-set-tag-sets/
52、副本集碎片整理的一种方法
使用MMAPv1存储引擎时,对于频繁大数据量的写入删除操作,碎片问题会变得很严重。在数据同步耗时不严重的情况下,我们不需要对每个副本做repair,而是轮流“卸下副本,删除对应的磁盘文件,重新挂上副本”。每个重新挂上的副本都会自动去重新同步一遍数据,碎片问题就解决了。
53、存储引擎升级为wiredTiger
我们当前的版本是MongoDB3.0.6,没有开启wiredTiger引擎,现在打算升级到wiredTiger引擎。
我们是Shared Cluster,shard是Replica Set。升级比较简单,只需要逐步对每一个副本都执行存储引擎升级即可,不影响线上服务。
升级时,只在启动命令中添加:--storageEngine wiredTiger。
步骤:首先,下掉一个副本;然后,把副本的磁盘文件删除掉;接着,在该副本的启动命令中添加--storageEngine wiredTiger后启动。这就升级完一个副本,等副本数据同步完成之后,其它副本也照样操作(或者处理完一个副本之后,拷贝该副本磁盘文件替换掉另一个副本的磁盘文件)。风险:如果数据量巨大,且有建索引的需求,容易出现内存用尽。
分片集群的configure server可以不升级。
升级之后磁盘存储优化效果极度明显,22GB的数据会被精简到1.3GB。
升级后的磁盘文件完全变了,所以不同存储引擎下的磁盘文件不能混用。
升级参考:https://docs.mongodb.org/manual/tutorial/change-sharded-cluster-wiredtiger/
54、oplogSizeMB不要设置得太大
启动配置中的这个字段是为了设置oplog collection的大小,oplog是操作记录,它是一个capped collection,在副本集群中,设置得太小可能导致secondary无法及时从primary同步数据。默认情况下是磁盘大小的5%。但是,如果这个字段设置得太大,可能导致暴内存,oplog的数据几乎是完全加载在内存中,一旦太大,必然暴内存,导致OOM。而且因为这个collection是capped,MongoDB启动之后无法修改其大小。
本文所在:http://www.cnblogs.com/cswuyg/p/4595799.html
分类: 数据处理,MongoDB