您可以检查以下内容。我试图 复制 该问题,并可以 确认 错误
ERROR: NameError("name 'event' is not defined")
根据您的示例和 我自己的表* ,我使用了DynamoDb流中的 模拟 INSERT :event
*
{ "Records": [ { "eventID": "b8b993cf16d1aacb61b40411b39e0b1f", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-1", "dynamodb": { "ApproximateCreationDateTime": 1595922821.0, "Keys": { "id": { "N": "1" } }, "NewImage": { "last_name": { "S": "V" }, "id": { "N": "1" }, "age": { "S": "2" } }, "SequenceNumber": "25200000000020406897812", "SizeBytes": 22, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569" }, { "eventID": "e5d5bec988945c06ffc879cf16b89bf7", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-1", "dynamodb": { "ApproximateCreationDateTime": 1595922821.0, "Keys": { "id": { "N": "9" } }, "NewImage": { "last_name": { "S": "ADD" }, "id": { "N": "9" }, "age": { "S": "95" } }, "SequenceNumber": "25300000000020406897813", "SizeBytes": 25, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569" }, { "eventID": "f1a7c9736253b5ef28ced38ed5ff645b", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-1", "dynamodb": { "ApproximateCreationDateTime": 1595922821.0, "Keys": { "id": { "N": "2" } }, "NewImage": { "last_name": { "S": "JJ" }, "id": { "N": "2" }, "age": { "S": "7" } }, "SequenceNumber": "25400000000020406897819", "SizeBytes": 23, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569" }, { "eventID": "bfcbad9dc19883e4172e6dc25e66637b", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-1", "dynamodb": { "ApproximateCreationDateTime": 1595922821.0, "Keys": { "id": { "N": "10" } }, "NewImage": { "last_name": { "S": "ADD" }, "id": { "N": "10" }, "age": { "S": "95" } }, "SequenceNumber": "25500000000020406897820", "SizeBytes": 25, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569" } ]}
修改示例
event:
{ "Records": [ { "eventID": "4e4629c88aa00e366c89a293d9c82d54", "eventName": "MODIFY", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-1", "dynamodb": { "ApproximateCreationDateTime": 1595924589.0, "Keys": { "id": { "N": "2" } }, "NewImage": { "last_name": { "S": "zhgdhfgdh" }, "id": { "N": "2" }, "age": { "S": "7" } }, "OldImage": { "last_name": { "S": "JJ" }, "id": { "N": "2" }, "age": { "S": "7" } }, "SequenceNumber": "25600000000020408264140", "SizeBytes": 49, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-1:34234:table/newtable/stream/2020-07-28T06:59:38.569" } ]}
我可以 确认 的lambda函数的修改代码现在 不会产生错误 :
import boto3import jsonimport refrom requests_aws4auth import AWS4Authfrom elasticsearch import Elasticsearch, RequestsHttpConnectionsession = boto3.session.Session()credentials = session.get_credentials()s3 = session.resource('s3')awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, session.region_name, 'es', session_token=credentials.token)es = Elasticsearch( ['https://vpc-test-dmamain-452frn764ggb4a.us-east-1.es.amazonaws.com'], use_ssl=True, verify_certs=True, http_auth=awsauth, connection_class=RequestsHttpConnection)reserved_fields = ["uid", "_id", "_type", "_source", "_all", "_parent", "_fieldnames", "_routing", "_index", "_size", "_timestamp", "_ttl"]def lambda_handler(event, context): print(event) #dynamodb = boto3.resource('dynamodb') # Loop over the DynamoDB Stream records for record in event['Records']: if record['eventName'] == "INSERT": insert_document(event, es, record) elif record['eventName'] == "REMOVE": remove_document(event, es, record) elif record['eventName'] == "MODIFY": modify_document(event, es, record)# Process MODIFY eventsdef modify_document(event, es, record): table = getTable(record) print("Dynamo Table: " + table) docId = docid(event, event) print("KEY") print(docId) # Unmarshal the DynamoDB JSON to a normal JSON doc = json.dumps(document(event)) print("Updated document:") print(doc) # We reindex the whole document as ES accepts partial docs es.index(index=table, body=doc, id=docId, doc_type=table, refresh=True) print("Successly modified - Index: " , table , " - document ID: " , docId)def remove_document(event, es, record): table = getTable(record) print("Dynamo Table: " + table) docId = docid(event, event) print("Deleting document ID: ", docId) es.delete(index=table, id=docId, doc_type=table, refresh=True) print("Successly removed - Index: ", table, " - document ID: " , docId)# Process INSERT eventsdef insert_document(event, es, record): table = getTable(record) print("Dynamo Table: " + table) # Create index if missing if es.indices.exists(table) == False: print("Create missing index: " + table) es.indices.create(table, body='{"settings": { "index.mapping.coerce": true } }') print("Index created: " + table) # Unmarshal the DynamoDB JSON to a normal JSON doc = json.dumps(document(event)) print("New document to Index:") print(doc) newId = docid(event, record) es.index(index=table, body=doc, id=newId, doc_type=table, refresh=True) print("Successly inserted - Index: " , table + " - document ID: " , newId)def getTable(record): p = re.compile('arn:aws:dynamodb:.*?:.*?:table/([0-9a-zA-Z_-]+)/.+') m = p.match(record['eventSourceARN']) if m is None: raise Exception("Table not found in SourceARN") return m.group(1).lower()def document(event): result = [] for r in event['Records']: tmp = {} for k, v in r['dynamodb']['NewImage'].items(): if "S" in v.keys() or "BOOL" in v.keys(): tmp[k] = v.get('S', v.get('BOOL', False)) elif 'NULL' in v: tmp[k] = None result.append(tmp) for i in result: return idef docid(event, record): result = [] for r in event['Records']: tmp = {} for k, v in r['dynamodb']['Keys'].items(): if "S" in v.keys() or "BOOL" in v.keys(): tmp[k] = v.get('S', v.get('BOOL', False)) elif 'NULL' in v: tmp[k] = None result.append(tmp) for newId in result: return newId
我 尚未验证
数据是否正确写入,修改或插入了ElasticSearch。但是我运行了ES域,并在lambda中使用了它来验证lambda是否可以连接到它并运行查询。
lambda的INSERT事件输出示例:
Dynamo Table: newtableNew document to Index:{"last_name": "V", "age": "2"}Successly inserted - Index: newtable - document ID: {}Dynamo Table: newtableNew document to Index:{"last_name": "V", "age": "2"}Successly inserted - Index: newtable - document ID: {}Dynamo Table: newtableNew document to Index:{"last_name": "V", "age": "2"}Successly inserted - Index: newtable - document ID: {}Dynamo Table: newtableNew document to Index:{"last_name": "V", "age": "2"}Successly inserted - Index: newtable - document ID: {}Example output from lambda from MODIFY event:
更新文件:
{ "last_name": "zhgdhfgdh", "age": "7"}Successly modified - Index: newtable - document ID: {}
我认为
docid如果它可以正常工作,则需要进一步调查,因为它似乎返回空dict:
document ID: {}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)