[go: nahoru, domu]

Skip to content

Commit

Permalink
PHOENIX-2198 Support correlate variable
Browse files Browse the repository at this point in the history
  • Loading branch information
maryannxue committed Sep 9, 2015
1 parent d18afe1 commit 2acb38a
Show file tree
Hide file tree
Showing 20 changed files with 744 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,14 @@ public AggregatePlan(
StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,
Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy,
Expression having) {
super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, groupBy, parallelIteratorFactory);
this(context, statement, table, projector, limit, orderBy, parallelIteratorFactory, groupBy, having, null);
}

private AggregatePlan(
StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,
Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy,
Expression having, Expression dynamicFilter) {
super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, groupBy, parallelIteratorFactory, dynamicFilter);
this.having = having;
this.aggregators = context.getAggregationManager().getAggregators();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.WhereCompiler;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ProjectedColumnExpression;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
Expand Down Expand Up @@ -93,12 +95,19 @@ public abstract class BaseQueryPlan implements QueryPlan {
protected final Integer limit;
protected final OrderBy orderBy;
protected final GroupBy groupBy;
protected final ParallelIteratorFactory parallelIteratorFactory;
protected final ParallelIteratorFactory parallelIteratorFactory;
/*
* The filter expression that contains CorrelateVariableFieldAccessExpression
* and will have impact on the ScanRanges. It will recompiled at runtime
* immediately before creating the ResultIterator.
*/
protected final Expression dynamicFilter;

protected BaseQueryPlan(
StatementContext context, FilterableStatement statement, TableRef table,
RowProjector projection, ParameterMetaData paramMetaData, Integer limit, OrderBy orderBy,
GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory) {
GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory,
Expression dynamicFilter) {
this.context = context;
this.statement = statement;
this.tableRef = table;
Expand All @@ -108,6 +117,7 @@ protected BaseQueryPlan(
this.orderBy = orderBy;
this.groupBy = groupBy;
this.parallelIteratorFactory = parallelIteratorFactory;
this.dynamicFilter = dynamicFilter;
}

@Override
Expand Down Expand Up @@ -141,6 +151,10 @@ public Integer getLimit() {
public RowProjector getProjector() {
return projection;
}

public Expression getDynamicFilter() {
return dynamicFilter;
}

// /**
// * Sets up an id used to do round robin queue processing on the server
Expand Down Expand Up @@ -175,6 +189,10 @@ public final ResultIterator iterator(final List<? extends SQLCloseable> dependen
Scan scan = context.getScan();
PTable table = context.getCurrentTable().getTable();

if (dynamicFilter != null) {
WhereCompiler.compile(context, statement, null, Collections.singletonList(dynamicFilter), false, null);
}

if (OrderBy.REV_ROW_KEY_ORDER_BY.equals(orderBy)) {
ScanUtil.setReversed(scan);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.execute;

import java.io.IOException;
import java.sql.SQLException;
import java.util.List;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.ValueBitSet;
import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.SchemaUtil;

import com.google.common.collect.Lists;

public class CorrelatePlan extends DelegateQueryPlan {
private final QueryPlan rhs;
private final String variableId;
private final JoinType joinType;
private final boolean isSingleValueOnly;
private final RuntimeContext runtimeContext;
private final KeyValueSchema joinedSchema;
private final KeyValueSchema lhsSchema;
private final KeyValueSchema rhsSchema;
private final int rhsFieldPosition;

public CorrelatePlan(QueryPlan lhs, QueryPlan rhs, String variableId,
JoinType joinType, boolean isSingleValueOnly,
RuntimeContext runtimeContext, PTable joinedTable,
PTable lhsTable, PTable rhsTable, int rhsFieldPosition) {
super(lhs);
if (joinType != JoinType.Inner && joinType != JoinType.Left && joinType != JoinType.Semi && joinType != JoinType.Anti)
throw new IllegalArgumentException("Unsupported join type '" + joinType + "' by CorrelatePlan");

this.rhs = rhs;
this.variableId = variableId;
this.joinType = joinType;
this.isSingleValueOnly = isSingleValueOnly;
this.runtimeContext = runtimeContext;
this.joinedSchema = buildSchema(joinedTable);
this.lhsSchema = buildSchema(lhsTable);
this.rhsSchema = buildSchema(rhsTable);
this.rhsFieldPosition = rhsFieldPosition;
}

private static KeyValueSchema buildSchema(PTable table) {
KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
if (table != null) {
for (PColumn column : table.getColumns()) {
if (!SchemaUtil.isPKColumn(column)) {
builder.addField(column);
}
}
}
return builder.build();
}

@Override
public ExplainPlan getExplainPlan() throws SQLException {
List<String> steps = Lists.newArrayList();
steps.add("NESTED-LOOP-JOIN (" + joinType.toString().toUpperCase() + ") TABLES");
for (String step : delegate.getExplainPlan().getPlanSteps()) {
steps.add(" " + step);
}
steps.add("AND" + (rhsSchema.getFieldCount() == 0 ? " (SKIP MERGE)" : ""));
for (String step : rhs.getExplainPlan().getPlanSteps()) {
steps.add(" " + step);
}
return new ExplainPlan(steps);
}

@Override
public ResultIterator iterator() throws SQLException {
return iterator(DefaultParallelScanGrouper.getInstance());
}

@Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper)
throws SQLException {
return new ResultIterator() {
private final ValueBitSet destBitSet = ValueBitSet.newInstance(joinedSchema);
private final ValueBitSet lhsBitSet = ValueBitSet.newInstance(lhsSchema);
private final ValueBitSet rhsBitSet =
(joinType == JoinType.Semi || joinType == JoinType.Anti) ?
ValueBitSet.EMPTY_VALUE_BITSET
: ValueBitSet.newInstance(rhsSchema);
private final ResultIterator iter = delegate.iterator();
private ResultIterator rhsIter = null;
private Tuple current = null;
private boolean closed = false;

@Override
public void close() throws SQLException {
if (!closed) {
closed = true;
iter.close();
if (rhsIter != null) {
rhsIter.close();
}
}
}

@Override
public Tuple next() throws SQLException {
if (closed)
return null;

Tuple rhsCurrent = null;
if (rhsIter != null) {
rhsCurrent = rhsIter.next();
if (rhsCurrent == null) {
rhsIter.close();
rhsIter = null;
} else if (isSingleValueOnly) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException();
}
}
while (rhsIter == null) {
current = iter.next();
if (current == null) {
close();
return null;
}
runtimeContext.setCorrelateVariableValue(variableId, current);
rhsIter = rhs.iterator();
rhsCurrent = rhsIter.next();
if ((rhsCurrent == null && (joinType == JoinType.Inner || joinType == JoinType.Semi))
|| (rhsCurrent != null && joinType == JoinType.Anti)) {
rhsIter.close();
rhsIter = null;
}
}

Tuple joined;
try {
joined = rhsBitSet == ValueBitSet.EMPTY_VALUE_BITSET ?
current : TupleProjector.mergeProjectedValue(
convertLhs(current), joinedSchema, destBitSet,
rhsCurrent, rhsSchema, rhsBitSet, rhsFieldPosition);
} catch (IOException e) {
throw new SQLException(e);
}

if ((joinType == JoinType.Semi || rhsCurrent == null) && rhsIter != null) {
rhsIter.close();
rhsIter = null;
}

return joined;
}

@Override
public void explain(List<String> planSteps) {
}

private ProjectedValueTuple convertLhs(Tuple lhs) throws IOException {
ProjectedValueTuple t;
if (lhs instanceof ProjectedValueTuple) {
t = (ProjectedValueTuple) lhs;
} else {
ImmutableBytesWritable ptr = getContext().getTempPtr();
TupleProjector.decodeProjectedValue(lhs, ptr);
lhsBitSet.clear();
lhsBitSet.or(ptr);
int bitSetLen = lhsBitSet.getEstimatedLength();
t = new ProjectedValueTuple(lhs, lhs.getValue(0).getTimestamp(),
ptr.get(), ptr.getOffset(), ptr.getLength(), bitSetLen);

}
return t;
}
};
}

@Override
public Integer getLimit() {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
public class DegenerateQueryPlan extends BaseQueryPlan {

public DegenerateQueryPlan(StatementContext context, FilterableStatement statement, TableRef table) {
super(context, statement, table, RowProjector.EMPTY_PROJECTOR, PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null);
super(context, statement, table, RowProjector.EMPTY_PROJECTOR, PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null, null);
context.setScanRanges(ScanRanges.NOTHING);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ public class HashJoinPlan extends DelegateQueryPlan {
private final HashJoinInfo joinInfo;
private final SubPlan[] subPlans;
private final boolean recompileWhereClause;
private final int maxServerCacheTimeToLive;
private List<SQLCloseable> dependencies;
private HashCacheClient hashClient;
private int maxServerCacheTimeToLive;
private AtomicLong firstJobEndTime;
private List<Expression> keyRangeExpressions;

Expand Down Expand Up @@ -114,6 +114,8 @@ private HashJoinPlan(SelectStatement statement,
this.joinInfo = joinInfo;
this.subPlans = subPlans;
this.recompileWhereClause = recompileWhereClause;
this.maxServerCacheTimeToLive = plan.getContext().getConnection().getQueryServices().getProps().getInt(
QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
}

@Override
Expand All @@ -130,8 +132,9 @@ public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLExcept
List<Future<Object>> futures = Lists.<Future<Object>>newArrayListWithExpectedSize(count);
dependencies = Lists.newArrayList();
if (joinInfo != null) {
hashClient = new HashCacheClient(delegate.getContext().getConnection());
maxServerCacheTimeToLive = services.getProps().getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
hashClient = hashClient != null ?
hashClient
: new HashCacheClient(delegate.getContext().getConnection());
firstJobEndTime = new AtomicLong(0);
keyRangeExpressions = new CopyOnWriteArrayList<Expression>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,20 @@
import org.apache.phoenix.schema.tuple.Tuple;

public class LiteralResultIterationPlan extends BaseQueryPlan {
protected final Iterator<Tuple> tupleIterator;
protected final Iterable<Tuple> tuples;

public LiteralResultIterationPlan(StatementContext context,
FilterableStatement statement, TableRef tableRef, RowProjector projection,
Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory) {
this(Collections.<Tuple> singletonList(new SingleKeyValueTuple(KeyValue.LOWESTKEY)).iterator(),
this(Collections.<Tuple> singletonList(new SingleKeyValueTuple(KeyValue.LOWESTKEY)),
context, statement, tableRef, projection, limit, orderBy, parallelIteratorFactory);
}

public LiteralResultIterationPlan(Iterator<Tuple> tupleIterator, StatementContext context,
public LiteralResultIterationPlan(Iterable<Tuple> tuples, StatementContext context,
FilterableStatement statement, TableRef tableRef, RowProjector projection,
Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory) {
super(context, statement, tableRef, projection, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory);
this.tupleIterator = tupleIterator;
super(context, statement, tableRef, projection, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory, null);
this.tuples = tuples;
}

@Override
Expand All @@ -74,6 +74,7 @@ public boolean useRoundRobinIterator() throws SQLException {
protected ResultIterator newIterator(ParallelScanGrouper scanGrouper)
throws SQLException {
ResultIterator scanner = new ResultIterator() {
private final Iterator<Tuple> tupleIterator = tuples.iterator();
private boolean closed = false;
private int count = 0;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.execute;

import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;

public interface RuntimeContext {

public abstract void defineCorrelateVariable(String variableId, TableRef def);

public abstract TableRef getCorrelateVariableDef(String variableId);

public abstract void setCorrelateVariableValue(String variableId, Tuple value);

public abstract Tuple getCorrelateVariableValue(String variableId);

}
Loading

0 comments on commit 2acb38a

Please sign in to comment.